12 KiB
12 KiB
WebSocket 增强功能实现说明
📋 概述
本文档说明了为直播IM系统新增的三个核心功能模块:
- 心跳检测机制 - 保持连接活跃,自动清理超时连接
- 在线状态管理 - 实时追踪用户在线状态和活跃时间
- 离线消息处理 - 保存和推送离线消息
🎯 新增文件列表
1. 在线状态管理服务
- 接口:
Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OnlineStatusService.java - 实现:
Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OnlineStatusServiceImpl.java
2. 离线消息管理服务
- 接口:
Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OfflineMessageService.java - 实现:
Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OfflineMessageServiceImpl.java
3. 心跳检测调度器
- 文件:
Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/HeartbeatScheduler.java
4. REST API 控制器
- 文件:
Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/OnlineStatusController.java
5. 更新的文件
LiveChatHandler.java- 集成在线状态和心跳检测PrivateChatHandler.java- 集成在线状态、离线消息和心跳检测
🔧 功能详解
1. 心跳检测机制 ✅
工作原理
- 发送间隔: 每30秒自动向所有活跃连接发送心跳消息
- 超时时间: 90秒无响应则认为连接超时
- 检查频率: 每60秒检查一次超时连接
- 自动清理: 超时连接会被自动关闭并清理
心跳消息格式
{
"type": "heartbeat",
"timestamp": 1234567890123
}
客户端响应格式
{
"type": "heartbeat_response"
}
// 或
{
"type": "pong"
}
关键方法
sendHeartbeat()- 定时发送心跳(@Scheduled,每30秒)checkTimeout()- 定时检查超时(@Scheduled,每60秒)recordHeartbeat(sessionId)- 记录心跳时间removeHeartbeat(sessionId)- 移除心跳记录
2. 在线状态管理 ✅
Redis 数据结构
# 用户在线状态
user:online:{userId} = "1" (过期时间: 5分钟)
# 用户最后活跃时间
user:last_active:{userId} = timestamp (过期时间: 5分钟)
# 直播间在线用户集合
room:online:{roomId} = Set<userId> (过期时间: 1小时)
核心功能
-
设置用户在线/离线
onlineStatusService.setUserOnline(userId, true/false); -
检查用户是否在线
boolean online = onlineStatusService.isUserOnline(userId); -
更新用户活跃时间
onlineStatusService.updateUserLastActiveTime(userId); -
直播间在线管理
// 添加用户到房间 onlineStatusService.addUserToRoom(roomId, userId); // 移除用户 onlineStatusService.removeUserFromRoom(roomId, userId); // 获取在线人数 Long count = onlineStatusService.getRoomOnlineCount(roomId); // 获取在线用户列表 Set<String> users = onlineStatusService.getRoomOnlineUsers(roomId); -
批量检查在线状态
List<Integer> onlineUsers = onlineStatusService.getOnlineUsers(userIds);
自动过期机制
- 用户在线状态5分钟无活动自动过期
- 每次消息发送/接收会自动更新活跃时间
- 心跳消息也会更新活跃时间
3. 离线消息处理 ✅
Redis 数据结构
# 用户离线消息队列(List)
offline:msg:{userId} = [message1, message2, ...]
- 最大保存100条消息
- 过期时间: 7天
核心功能
-
保存离线消息
offlineMessageService.saveOfflineMessage(userId, messageJson); -
获取离线消息
// 获取指定数量 List<String> messages = offlineMessageService.getOfflineMessages(userId, 50); // 获取全部 List<String> allMessages = offlineMessageService.getAllOfflineMessages(userId); -
清除离线消息
offlineMessageService.clearOfflineMessages(userId); -
获取离线消息数量
Long count = offlineMessageService.getOfflineMessageCount(userId);
离线消息流程
- 用户A发送消息给用户B
- 检查用户B是否在线
- 如果在线 → 直接推送
- 如果离线 → 保存到Redis离线消息队列
- 用户B上线时 → 自动推送所有离线消息
- 推送完成后 → 清除离线消息
消息格式
离线消息保存时会自动添加时间戳:
{
"type": "chat",
"messageId": "123",
"userId": 456,
"username": "张三",
"message": "你好",
"timestamp": 1234567890123,
"savedAt": 1234567890456
}
🔌 REST API 接口
基础路径
http://localhost:8081/api/front/online
接口列表
1. 检查用户是否在线
GET /status/{userId}
响应示例:
{
"code": 200,
"message": "success",
"data": {
"userId": 123,
"online": true,
"lastActiveTime": 1234567890123
}
}
2. 批量检查用户在线状态
POST /status/batch
Content-Type: application/json
[123, 456, 789]
响应示例:
{
"code": 200,
"data": {
"total": 3,
"onlineCount": 2,
"onlineUsers": [123, 456]
}
}
3. 获取直播间在线用户列表
GET /room/{roomId}/users
响应示例:
{
"code": 200,
"data": {
"roomId": "101",
"count": 150,
"users": ["user1", "user2", "user3"]
}
}
4. 获取直播间在线人数
GET /room/{roomId}/count
响应示例:
{
"code": 200,
"data": {
"roomId": "101",
"count": 150
}
}
5. 获取用户离线消息数量
GET /offline/count/{userId}
响应示例:
{
"code": 200,
"data": {
"userId": 123,
"count": 5
}
}
6. 获取用户离线消息
GET /offline/messages/{userId}?limit=50
响应示例:
{
"code": 200,
"data": {
"userId": 123,
"messages": ["...", "..."],
"count": 5,
"totalCount": 5
}
}
7. 清除用户离线消息
DELETE /offline/messages/{userId}
8. 获取WebSocket连接统计
GET /stats
响应示例:
{
"code": 200,
"data": {
"activeConnections": 256,
"timestamp": 1234567890123
}
}
🔄 WebSocket 连接流程
直播间聊天连接流程
1. 建立连接
ws://localhost:8081/ws/live/chat/{roomId}?userId={userId}
2. 连接成功后
- ✅ 添加到房间Session映射
- ✅ 设置用户在线状态
- ✅ 添加用户到房间在线列表
- ✅ 记录心跳时间
- ✅ 发送欢迎消息
- ✅ 广播用户加入消息
3. 消息交互
// 发送聊天消息
{
"type": "chat",
"userId": "123",
"nickname": "张三",
"content": "大家好"
}
// 接收心跳
{
"type": "heartbeat",
"timestamp": 1234567890123
}
// 响应心跳
{
"type": "heartbeat_response"
}
4. 断开连接
- ✅ 从房间Session映射移除
- ✅ 从房间在线列表移除
- ✅ 移除心跳记录
- ✅ 广播用户离开消息
私聊连接流程
1. 建立连接
ws://localhost:8081/ws/chat/{conversationId}?userId={userId}
2. 连接成功后
- ✅ 验证用户权限
- ✅ 添加到会话Session映射
- ✅ 添加到用户Session映射
- ✅ 设置用户在线状态
- ✅ 记录心跳时间
- ✅ 推送离线消息
- ✅ 标记会话为已读
3. 消息交互
// 发送聊天消息
{
"type": "chat",
"content": "你好",
"messageType": "text"
}
// 发送已读通知
{
"type": "read"
}
// 发送正在输入
{
"type": "typing"
}
// 响应心跳
{
"type": "heartbeat_response"
}
4. 离线消息处理
- 发送消息时检查对方是否在线
- 在线 → 直接推送
- 离线 → 保存到Redis
- 对方上线时自动推送
5. 断开连接
- ✅ 从会话Session映射移除
- ✅ 从用户Session映射移除
- ✅ 检查是否还有其他连接
- ✅ 无其他连接则设置离线
- ✅ 移除心跳记录
📊 性能优化
Redis 使用优化
-
Key过期策略
- 在线状态: 5分钟自动过期
- 离线消息: 7天自动过期
- 房间在线列表: 1小时自动过期
-
数据结构选择
- 在线状态: String (简单快速)
- 离线消息: List (FIFO队列)
- 房间在线: Set (去重、快速查询)
-
批量操作
- 支持批量检查用户在线状态
- 减少Redis访问次数
内存优化
-
离线消息限制
- 每个用户最多保存100条离线消息
- 超过限制自动删除最旧的消息
-
心跳记录
- 使用ConcurrentHashMap存储
- 连接断开时自动清理
🧪 测试建议
1. 心跳检测测试
// 客户端代码示例
const ws = new WebSocket('ws://localhost:8081/ws/live/chat/101?userId=123');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// 收到心跳,立即响应
if (data.type === 'heartbeat') {
ws.send(JSON.stringify({ type: 'heartbeat_response' }));
}
};
2. 在线状态测试
# 检查用户是否在线
curl http://localhost:8081/api/front/online/status/123
# 获取房间在线人数
curl http://localhost:8081/api/front/online/room/101/count
3. 离线消息测试
- 用户A连接WebSocket
- 用户B断开连接(离线)
- 用户A发送消息给用户B
- 检查离线消息数量
- 用户B重新连接
- 验证是否收到离线消息
⚠️ 注意事项
1. Redis 依赖
- 确保Redis服务正常运行
- 检查Redis配置(application.yml)
- 建议使用Redis 5.0+
2. 定时任务
- 心跳检测使用Spring @Scheduled
- 确保启动类有 @EnableScheduling 注解
- 可以通过配置调整心跳间隔
3. 并发安全
- 使用ConcurrentHashMap和CopyOnWriteArraySet
- 所有Session操作都是线程安全的
- Redis操作通过RedisUtil统一管理
4. 异常处理
- 所有关键操作都有try-catch
- 异常不会影响其他用户
- 详细的日志记录便于排查问题
🔧 配置说明
心跳配置(可在代码中调整)
// HeartbeatScheduler.java
private static final long HEARTBEAT_TIMEOUT = 90000; // 90秒超时
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒发送间隔
在线状态配置
// OnlineStatusServiceImpl.java
private static final long ONLINE_EXPIRE_SECONDS = 300; // 5分钟过期
离线消息配置
// OfflineMessageServiceImpl.java
private static final int MAX_OFFLINE_MESSAGES = 100; // 最多100条
private static final long OFFLINE_MESSAGE_EXPIRE_SECONDS = 7 * 24 * 3600; // 7天
📈 监控建议
1. 关键指标
- WebSocket活跃连接数
- 在线用户数
- 离线消息堆积数量
- 心跳超时次数
2. 日志监控
# 查看心跳日志
grep "Heartbeat" application.log
# 查看在线状态日志
grep "OnlineStatus" application.log
# 查看离线消息日志
grep "OfflineMessage" application.log
3. Redis 监控
# 查看在线用户数
redis-cli KEYS "user:online:*" | wc -l
# 查看离线消息总数
redis-cli KEYS "offline:msg:*" | wc -l
# 查看房间在线列表
redis-cli SMEMBERS "room:online:101"
✅ 完成度总结
| 功能模块 | 完成度 | 说明 |
|---|---|---|
| 心跳检测 | ✅ 100% | 完全实现,包括发送、检测、超时清理 |
| 在线状态管理 | ✅ 100% | 完全实现,支持用户和房间在线状态 |
| 离线消息处理 | ✅ 100% | 完全实现,自动保存和推送 |
| REST API | ✅ 100% | 提供完整的查询接口 |
| 集成到Handler | ✅ 100% | LiveChatHandler和PrivateChatHandler已集成 |
🎉 总结
现在你的WebSocket系统已经具备了完整的企业级功能:
- ✅ 稳定性: 心跳检测确保连接健康
- ✅ 实时性: 在线状态实时更新
- ✅ 可靠性: 离线消息不丢失
- ✅ 可监控: 提供完整的REST API
- ✅ 高性能: Redis缓存,自动过期
- ✅ 易维护: 清晰的日志和异常处理
所有功能都已经集成到现有的WebSocket Handler中,无需修改客户端代码即可享受这些增强功能!