zhibo/Zhibo/WebSocket增强功能实现说明.md

590 lines
12 KiB
Markdown
Raw Permalink Normal View History

# WebSocket 增强功能实现说明
## 📋 概述
本文档说明了为直播IM系统新增的三个核心功能模块
1. **心跳检测机制** - 保持连接活跃,自动清理超时连接
2. **在线状态管理** - 实时追踪用户在线状态和活跃时间
3. **离线消息处理** - 保存和推送离线消息
---
## 🎯 新增文件列表
### 1. 在线状态管理服务
- **接口**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OnlineStatusService.java`
- **实现**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OnlineStatusServiceImpl.java`
### 2. 离线消息管理服务
- **接口**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OfflineMessageService.java`
- **实现**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OfflineMessageServiceImpl.java`
### 3. 心跳检测调度器
- **文件**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/HeartbeatScheduler.java`
### 4. REST API 控制器
- **文件**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/OnlineStatusController.java`
### 5. 更新的文件
- `LiveChatHandler.java` - 集成在线状态和心跳检测
- `PrivateChatHandler.java` - 集成在线状态、离线消息和心跳检测
---
## 🔧 功能详解
### 1. 心跳检测机制 ✅
#### 工作原理
- **发送间隔**: 每30秒自动向所有活跃连接发送心跳消息
- **超时时间**: 90秒无响应则认为连接超时
- **检查频率**: 每60秒检查一次超时连接
- **自动清理**: 超时连接会被自动关闭并清理
#### 心跳消息格式
```json
{
"type": "heartbeat",
"timestamp": 1234567890123
}
```
#### 客户端响应格式
```json
{
"type": "heartbeat_response"
}
// 或
{
"type": "pong"
}
```
#### 关键方法
- `sendHeartbeat()` - 定时发送心跳(@Scheduled每30秒
- `checkTimeout()` - 定时检查超时(@Scheduled每60秒
- `recordHeartbeat(sessionId)` - 记录心跳时间
- `removeHeartbeat(sessionId)` - 移除心跳记录
---
### 2. 在线状态管理 ✅
#### Redis 数据结构
```
# 用户在线状态
user:online:{userId} = "1" (过期时间: 5分钟)
# 用户最后活跃时间
user:last_active:{userId} = timestamp (过期时间: 5分钟)
# 直播间在线用户集合
room:online:{roomId} = Set<userId> (过期时间: 1小时)
```
#### 核心功能
1. **设置用户在线/离线**
```java
onlineStatusService.setUserOnline(userId, true/false);
```
2. **检查用户是否在线**
```java
boolean online = onlineStatusService.isUserOnline(userId);
```
3. **更新用户活跃时间**
```java
onlineStatusService.updateUserLastActiveTime(userId);
```
4. **直播间在线管理**
```java
// 添加用户到房间
onlineStatusService.addUserToRoom(roomId, userId);
// 移除用户
onlineStatusService.removeUserFromRoom(roomId, userId);
// 获取在线人数
Long count = onlineStatusService.getRoomOnlineCount(roomId);
// 获取在线用户列表
Set<String> users = onlineStatusService.getRoomOnlineUsers(roomId);
```
5. **批量检查在线状态**
```java
List<Integer> onlineUsers = onlineStatusService.getOnlineUsers(userIds);
```
#### 自动过期机制
- 用户在线状态5分钟无活动自动过期
- 每次消息发送/接收会自动更新活跃时间
- 心跳消息也会更新活跃时间
---
### 3. 离线消息处理 ✅
#### Redis 数据结构
```
# 用户离线消息队列List
offline:msg:{userId} = [message1, message2, ...]
- 最大保存100条消息
- 过期时间: 7天
```
#### 核心功能
1. **保存离线消息**
```java
offlineMessageService.saveOfflineMessage(userId, messageJson);
```
2. **获取离线消息**
```java
// 获取指定数量
List<String> messages = offlineMessageService.getOfflineMessages(userId, 50);
// 获取全部
List<String> allMessages = offlineMessageService.getAllOfflineMessages(userId);
```
3. **清除离线消息**
```java
offlineMessageService.clearOfflineMessages(userId);
```
4. **获取离线消息数量**
```java
Long count = offlineMessageService.getOfflineMessageCount(userId);
```
#### 离线消息流程
1. 用户A发送消息给用户B
2. 检查用户B是否在线
3. 如果在线 → 直接推送
4. 如果离线 → 保存到Redis离线消息队列
5. 用户B上线时 → 自动推送所有离线消息
6. 推送完成后 → 清除离线消息
#### 消息格式
离线消息保存时会自动添加时间戳:
```json
{
"type": "chat",
"messageId": "123",
"userId": 456,
"username": "张三",
"message": "你好",
"timestamp": 1234567890123,
"savedAt": 1234567890456
}
```
---
## 🔌 REST API 接口
### 基础路径
```
http://localhost:8081/api/front/online
```
### 接口列表
#### 1. 检查用户是否在线
```http
GET /status/{userId}
```
**响应示例**:
```json
{
"code": 200,
"message": "success",
"data": {
"userId": 123,
"online": true,
"lastActiveTime": 1234567890123
}
}
```
#### 2. 批量检查用户在线状态
```http
POST /status/batch
Content-Type: application/json
[123, 456, 789]
```
**响应示例**:
```json
{
"code": 200,
"data": {
"total": 3,
"onlineCount": 2,
"onlineUsers": [123, 456]
}
}
```
#### 3. 获取直播间在线用户列表
```http
GET /room/{roomId}/users
```
**响应示例**:
```json
{
"code": 200,
"data": {
"roomId": "101",
"count": 150,
"users": ["user1", "user2", "user3"]
}
}
```
#### 4. 获取直播间在线人数
```http
GET /room/{roomId}/count
```
**响应示例**:
```json
{
"code": 200,
"data": {
"roomId": "101",
"count": 150
}
}
```
#### 5. 获取用户离线消息数量
```http
GET /offline/count/{userId}
```
**响应示例**:
```json
{
"code": 200,
"data": {
"userId": 123,
"count": 5
}
}
```
#### 6. 获取用户离线消息
```http
GET /offline/messages/{userId}?limit=50
```
**响应示例**:
```json
{
"code": 200,
"data": {
"userId": 123,
"messages": ["...", "..."],
"count": 5,
"totalCount": 5
}
}
```
#### 7. 清除用户离线消息
```http
DELETE /offline/messages/{userId}
```
#### 8. 获取WebSocket连接统计
```http
GET /stats
```
**响应示例**:
```json
{
"code": 200,
"data": {
"activeConnections": 256,
"timestamp": 1234567890123
}
}
```
---
## 🔄 WebSocket 连接流程
### 直播间聊天连接流程
#### 1. 建立连接
```
ws://localhost:8081/ws/live/chat/{roomId}?userId={userId}
```
#### 2. 连接成功后
- ✅ 添加到房间Session映射
- ✅ 设置用户在线状态
- ✅ 添加用户到房间在线列表
- ✅ 记录心跳时间
- ✅ 发送欢迎消息
- ✅ 广播用户加入消息
#### 3. 消息交互
```json
// 发送聊天消息
{
"type": "chat",
"userId": "123",
"nickname": "张三",
"content": "大家好"
}
// 接收心跳
{
"type": "heartbeat",
"timestamp": 1234567890123
}
// 响应心跳
{
"type": "heartbeat_response"
}
```
#### 4. 断开连接
- ✅ 从房间Session映射移除
- ✅ 从房间在线列表移除
- ✅ 移除心跳记录
- ✅ 广播用户离开消息
---
### 私聊连接流程
#### 1. 建立连接
```
ws://localhost:8081/ws/chat/{conversationId}?userId={userId}
```
#### 2. 连接成功后
- ✅ 验证用户权限
- ✅ 添加到会话Session映射
- ✅ 添加到用户Session映射
- ✅ 设置用户在线状态
- ✅ 记录心跳时间
-**推送离线消息**
- ✅ 标记会话为已读
#### 3. 消息交互
```json
// 发送聊天消息
{
"type": "chat",
"content": "你好",
"messageType": "text"
}
// 发送已读通知
{
"type": "read"
}
// 发送正在输入
{
"type": "typing"
}
// 响应心跳
{
"type": "heartbeat_response"
}
```
#### 4. 离线消息处理
- 发送消息时检查对方是否在线
- 在线 → 直接推送
- 离线 → 保存到Redis
- 对方上线时自动推送
#### 5. 断开连接
- ✅ 从会话Session映射移除
- ✅ 从用户Session映射移除
- ✅ 检查是否还有其他连接
- ✅ 无其他连接则设置离线
- ✅ 移除心跳记录
---
## 📊 性能优化
### Redis 使用优化
1. **Key过期策略**
- 在线状态: 5分钟自动过期
- 离线消息: 7天自动过期
- 房间在线列表: 1小时自动过期
2. **数据结构选择**
- 在线状态: String (简单快速)
- 离线消息: List (FIFO队列)
- 房间在线: Set (去重、快速查询)
3. **批量操作**
- 支持批量检查用户在线状态
- 减少Redis访问次数
### 内存优化
1. **离线消息限制**
- 每个用户最多保存100条离线消息
- 超过限制自动删除最旧的消息
2. **心跳记录**
- 使用ConcurrentHashMap存储
- 连接断开时自动清理
---
## 🧪 测试建议
### 1. 心跳检测测试
```javascript
// 客户端代码示例
const ws = new WebSocket('ws://localhost:8081/ws/live/chat/101?userId=123');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// 收到心跳,立即响应
if (data.type === 'heartbeat') {
ws.send(JSON.stringify({ type: 'heartbeat_response' }));
}
};
```
### 2. 在线状态测试
```bash
# 检查用户是否在线
curl http://localhost:8081/api/front/online/status/123
# 获取房间在线人数
curl http://localhost:8081/api/front/online/room/101/count
```
### 3. 离线消息测试
1. 用户A连接WebSocket
2. 用户B断开连接离线
3. 用户A发送消息给用户B
4. 检查离线消息数量
5. 用户B重新连接
6. 验证是否收到离线消息
---
## ⚠️ 注意事项
### 1. Redis 依赖
- 确保Redis服务正常运行
- 检查Redis配置application.yml
- 建议使用Redis 5.0+
### 2. 定时任务
- 心跳检测使用Spring @Scheduled
- 确保启动类有 @EnableScheduling 注解
- 可以通过配置调整心跳间隔
### 3. 并发安全
- 使用ConcurrentHashMap和CopyOnWriteArraySet
- 所有Session操作都是线程安全的
- Redis操作通过RedisUtil统一管理
### 4. 异常处理
- 所有关键操作都有try-catch
- 异常不会影响其他用户
- 详细的日志记录便于排查问题
---
## 🔧 配置说明
### 心跳配置(可在代码中调整)
```java
// HeartbeatScheduler.java
private static final long HEARTBEAT_TIMEOUT = 90000; // 90秒超时
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒发送间隔
```
### 在线状态配置
```java
// OnlineStatusServiceImpl.java
private static final long ONLINE_EXPIRE_SECONDS = 300; // 5分钟过期
```
### 离线消息配置
```java
// OfflineMessageServiceImpl.java
private static final int MAX_OFFLINE_MESSAGES = 100; // 最多100条
private static final long OFFLINE_MESSAGE_EXPIRE_SECONDS = 7 * 24 * 3600; // 7天
```
---
## 📈 监控建议
### 1. 关键指标
- WebSocket活跃连接数
- 在线用户数
- 离线消息堆积数量
- 心跳超时次数
### 2. 日志监控
```bash
# 查看心跳日志
grep "Heartbeat" application.log
# 查看在线状态日志
grep "OnlineStatus" application.log
# 查看离线消息日志
grep "OfflineMessage" application.log
```
### 3. Redis 监控
```bash
# 查看在线用户数
redis-cli KEYS "user:online:*" | wc -l
# 查看离线消息总数
redis-cli KEYS "offline:msg:*" | wc -l
# 查看房间在线列表
redis-cli SMEMBERS "room:online:101"
```
---
## ✅ 完成度总结
| 功能模块 | 完成度 | 说明 |
|---------|--------|------|
| 心跳检测 | ✅ 100% | 完全实现,包括发送、检测、超时清理 |
| 在线状态管理 | ✅ 100% | 完全实现,支持用户和房间在线状态 |
| 离线消息处理 | ✅ 100% | 完全实现,自动保存和推送 |
| REST API | ✅ 100% | 提供完整的查询接口 |
| 集成到Handler | ✅ 100% | LiveChatHandler和PrivateChatHandler已集成 |
---
## 🎉 总结
现在你的WebSocket系统已经具备了完整的企业级功能
1.**稳定性**: 心跳检测确保连接健康
2.**实时性**: 在线状态实时更新
3.**可靠性**: 离线消息不丢失
4.**可监控**: 提供完整的REST API
5.**高性能**: Redis缓存自动过期
6.**易维护**: 清晰的日志和异常处理
所有功能都已经集成到现有的WebSocket Handler中无需修改客户端代码即可享受这些增强功能