zhibo/Zhibo/WebSocket增强功能实现说明.md

12 KiB
Raw Blame History

WebSocket 增强功能实现说明

📋 概述

本文档说明了为直播IM系统新增的三个核心功能模块

  1. 心跳检测机制 - 保持连接活跃,自动清理超时连接
  2. 在线状态管理 - 实时追踪用户在线状态和活跃时间
  3. 离线消息处理 - 保存和推送离线消息

🎯 新增文件列表

1. 在线状态管理服务

  • 接口: Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OnlineStatusService.java
  • 实现: Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OnlineStatusServiceImpl.java

2. 离线消息管理服务

  • 接口: Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OfflineMessageService.java
  • 实现: Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OfflineMessageServiceImpl.java

3. 心跳检测调度器

  • 文件: Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/HeartbeatScheduler.java

4. REST API 控制器

  • 文件: Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/OnlineStatusController.java

5. 更新的文件

  • LiveChatHandler.java - 集成在线状态和心跳检测
  • PrivateChatHandler.java - 集成在线状态、离线消息和心跳检测

🔧 功能详解

1. 心跳检测机制

工作原理

  • 发送间隔: 每30秒自动向所有活跃连接发送心跳消息
  • 超时时间: 90秒无响应则认为连接超时
  • 检查频率: 每60秒检查一次超时连接
  • 自动清理: 超时连接会被自动关闭并清理

心跳消息格式

{
  "type": "heartbeat",
  "timestamp": 1234567890123
}

客户端响应格式

{
  "type": "heartbeat_response"
}
// 或
{
  "type": "pong"
}

关键方法

  • sendHeartbeat() - 定时发送心跳(@Scheduled每30秒
  • checkTimeout() - 定时检查超时(@Scheduled每60秒
  • recordHeartbeat(sessionId) - 记录心跳时间
  • removeHeartbeat(sessionId) - 移除心跳记录

2. 在线状态管理

Redis 数据结构

# 用户在线状态
user:online:{userId} = "1"  (过期时间: 5分钟)

# 用户最后活跃时间
user:last_active:{userId} = timestamp  (过期时间: 5分钟)

# 直播间在线用户集合
room:online:{roomId} = Set<userId>  (过期时间: 1小时)

核心功能

  1. 设置用户在线/离线

    onlineStatusService.setUserOnline(userId, true/false);
    
  2. 检查用户是否在线

    boolean online = onlineStatusService.isUserOnline(userId);
    
  3. 更新用户活跃时间

    onlineStatusService.updateUserLastActiveTime(userId);
    
  4. 直播间在线管理

    // 添加用户到房间
    onlineStatusService.addUserToRoom(roomId, userId);
    
    // 移除用户
    onlineStatusService.removeUserFromRoom(roomId, userId);
    
    // 获取在线人数
    Long count = onlineStatusService.getRoomOnlineCount(roomId);
    
    // 获取在线用户列表
    Set<String> users = onlineStatusService.getRoomOnlineUsers(roomId);
    
  5. 批量检查在线状态

    List<Integer> onlineUsers = onlineStatusService.getOnlineUsers(userIds);
    

自动过期机制

  • 用户在线状态5分钟无活动自动过期
  • 每次消息发送/接收会自动更新活跃时间
  • 心跳消息也会更新活跃时间

3. 离线消息处理

Redis 数据结构

# 用户离线消息队列List
offline:msg:{userId} = [message1, message2, ...]
- 最大保存100条消息
- 过期时间: 7天

核心功能

  1. 保存离线消息

    offlineMessageService.saveOfflineMessage(userId, messageJson);
    
  2. 获取离线消息

    // 获取指定数量
    List<String> messages = offlineMessageService.getOfflineMessages(userId, 50);
    
    // 获取全部
    List<String> allMessages = offlineMessageService.getAllOfflineMessages(userId);
    
  3. 清除离线消息

    offlineMessageService.clearOfflineMessages(userId);
    
  4. 获取离线消息数量

    Long count = offlineMessageService.getOfflineMessageCount(userId);
    

离线消息流程

  1. 用户A发送消息给用户B
  2. 检查用户B是否在线
  3. 如果在线 → 直接推送
  4. 如果离线 → 保存到Redis离线消息队列
  5. 用户B上线时 → 自动推送所有离线消息
  6. 推送完成后 → 清除离线消息

消息格式

离线消息保存时会自动添加时间戳:

{
  "type": "chat",
  "messageId": "123",
  "userId": 456,
  "username": "张三",
  "message": "你好",
  "timestamp": 1234567890123,
  "savedAt": 1234567890456
}

🔌 REST API 接口

基础路径

http://localhost:8081/api/front/online

接口列表

1. 检查用户是否在线

GET /status/{userId}

响应示例:

{
  "code": 200,
  "message": "success",
  "data": {
    "userId": 123,
    "online": true,
    "lastActiveTime": 1234567890123
  }
}

2. 批量检查用户在线状态

POST /status/batch
Content-Type: application/json

[123, 456, 789]

响应示例:

{
  "code": 200,
  "data": {
    "total": 3,
    "onlineCount": 2,
    "onlineUsers": [123, 456]
  }
}

3. 获取直播间在线用户列表

GET /room/{roomId}/users

响应示例:

{
  "code": 200,
  "data": {
    "roomId": "101",
    "count": 150,
    "users": ["user1", "user2", "user3"]
  }
}

4. 获取直播间在线人数

GET /room/{roomId}/count

响应示例:

{
  "code": 200,
  "data": {
    "roomId": "101",
    "count": 150
  }
}

5. 获取用户离线消息数量

GET /offline/count/{userId}

响应示例:

{
  "code": 200,
  "data": {
    "userId": 123,
    "count": 5
  }
}

6. 获取用户离线消息

GET /offline/messages/{userId}?limit=50

响应示例:

{
  "code": 200,
  "data": {
    "userId": 123,
    "messages": ["...", "..."],
    "count": 5,
    "totalCount": 5
  }
}

7. 清除用户离线消息

DELETE /offline/messages/{userId}

8. 获取WebSocket连接统计

GET /stats

响应示例:

{
  "code": 200,
  "data": {
    "activeConnections": 256,
    "timestamp": 1234567890123
  }
}

🔄 WebSocket 连接流程

直播间聊天连接流程

1. 建立连接

ws://localhost:8081/ws/live/chat/{roomId}?userId={userId}

2. 连接成功后

  • 添加到房间Session映射
  • 设置用户在线状态
  • 添加用户到房间在线列表
  • 记录心跳时间
  • 发送欢迎消息
  • 广播用户加入消息

3. 消息交互

// 发送聊天消息
{
  "type": "chat",
  "userId": "123",
  "nickname": "张三",
  "content": "大家好"
}

// 接收心跳
{
  "type": "heartbeat",
  "timestamp": 1234567890123
}

// 响应心跳
{
  "type": "heartbeat_response"
}

4. 断开连接

  • 从房间Session映射移除
  • 从房间在线列表移除
  • 移除心跳记录
  • 广播用户离开消息

私聊连接流程

1. 建立连接

ws://localhost:8081/ws/chat/{conversationId}?userId={userId}

2. 连接成功后

  • 验证用户权限
  • 添加到会话Session映射
  • 添加到用户Session映射
  • 设置用户在线状态
  • 记录心跳时间
  • 推送离线消息
  • 标记会话为已读

3. 消息交互

// 发送聊天消息
{
  "type": "chat",
  "content": "你好",
  "messageType": "text"
}

// 发送已读通知
{
  "type": "read"
}

// 发送正在输入
{
  "type": "typing"
}

// 响应心跳
{
  "type": "heartbeat_response"
}

4. 离线消息处理

  • 发送消息时检查对方是否在线
  • 在线 → 直接推送
  • 离线 → 保存到Redis
  • 对方上线时自动推送

5. 断开连接

  • 从会话Session映射移除
  • 从用户Session映射移除
  • 检查是否还有其他连接
  • 无其他连接则设置离线
  • 移除心跳记录

📊 性能优化

Redis 使用优化

  1. Key过期策略

    • 在线状态: 5分钟自动过期
    • 离线消息: 7天自动过期
    • 房间在线列表: 1小时自动过期
  2. 数据结构选择

    • 在线状态: String (简单快速)
    • 离线消息: List (FIFO队列)
    • 房间在线: Set (去重、快速查询)
  3. 批量操作

    • 支持批量检查用户在线状态
    • 减少Redis访问次数

内存优化

  1. 离线消息限制

    • 每个用户最多保存100条离线消息
    • 超过限制自动删除最旧的消息
  2. 心跳记录

    • 使用ConcurrentHashMap存储
    • 连接断开时自动清理

🧪 测试建议

1. 心跳检测测试

// 客户端代码示例
const ws = new WebSocket('ws://localhost:8081/ws/live/chat/101?userId=123');

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  // 收到心跳,立即响应
  if (data.type === 'heartbeat') {
    ws.send(JSON.stringify({ type: 'heartbeat_response' }));
  }
};

2. 在线状态测试

# 检查用户是否在线
curl http://localhost:8081/api/front/online/status/123

# 获取房间在线人数
curl http://localhost:8081/api/front/online/room/101/count

3. 离线消息测试

  1. 用户A连接WebSocket
  2. 用户B断开连接离线
  3. 用户A发送消息给用户B
  4. 检查离线消息数量
  5. 用户B重新连接
  6. 验证是否收到离线消息

⚠️ 注意事项

1. Redis 依赖

  • 确保Redis服务正常运行
  • 检查Redis配置application.yml
  • 建议使用Redis 5.0+

2. 定时任务

  • 心跳检测使用Spring @Scheduled
  • 确保启动类有 @EnableScheduling 注解
  • 可以通过配置调整心跳间隔

3. 并发安全

  • 使用ConcurrentHashMap和CopyOnWriteArraySet
  • 所有Session操作都是线程安全的
  • Redis操作通过RedisUtil统一管理

4. 异常处理

  • 所有关键操作都有try-catch
  • 异常不会影响其他用户
  • 详细的日志记录便于排查问题

🔧 配置说明

心跳配置(可在代码中调整)

// HeartbeatScheduler.java
private static final long HEARTBEAT_TIMEOUT = 90000;  // 90秒超时
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒发送间隔

在线状态配置

// OnlineStatusServiceImpl.java
private static final long ONLINE_EXPIRE_SECONDS = 300; // 5分钟过期

离线消息配置

// OfflineMessageServiceImpl.java
private static final int MAX_OFFLINE_MESSAGES = 100;  // 最多100条
private static final long OFFLINE_MESSAGE_EXPIRE_SECONDS = 7 * 24 * 3600; // 7天

📈 监控建议

1. 关键指标

  • WebSocket活跃连接数
  • 在线用户数
  • 离线消息堆积数量
  • 心跳超时次数

2. 日志监控

# 查看心跳日志
grep "Heartbeat" application.log

# 查看在线状态日志
grep "OnlineStatus" application.log

# 查看离线消息日志
grep "OfflineMessage" application.log

3. Redis 监控

# 查看在线用户数
redis-cli KEYS "user:online:*" | wc -l

# 查看离线消息总数
redis-cli KEYS "offline:msg:*" | wc -l

# 查看房间在线列表
redis-cli SMEMBERS "room:online:101"

完成度总结

功能模块 完成度 说明
心跳检测 100% 完全实现,包括发送、检测、超时清理
在线状态管理 100% 完全实现,支持用户和房间在线状态
离线消息处理 100% 完全实现,自动保存和推送
REST API 100% 提供完整的查询接口
集成到Handler 100% LiveChatHandler和PrivateChatHandler已集成

🎉 总结

现在你的WebSocket系统已经具备了完整的企业级功能

  1. 稳定性: 心跳检测确保连接健康
  2. 实时性: 在线状态实时更新
  3. 可靠性: 离线消息不丢失
  4. 可监控: 提供完整的REST API
  5. 高性能: Redis缓存自动过期
  6. 易维护: 清晰的日志和异常处理

所有功能都已经集成到现有的WebSocket Handler中无需修改客户端代码即可享受这些增强功能