zhibo/Zhibo/WebSocket增强功能实现说明.md

590 lines
12 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WebSocket 增强功能实现说明
## 📋 概述
本文档说明了为直播IM系统新增的三个核心功能模块
1. **心跳检测机制** - 保持连接活跃,自动清理超时连接
2. **在线状态管理** - 实时追踪用户在线状态和活跃时间
3. **离线消息处理** - 保存和推送离线消息
---
## 🎯 新增文件列表
### 1. 在线状态管理服务
- **接口**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OnlineStatusService.java`
- **实现**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OnlineStatusServiceImpl.java`
### 2. 离线消息管理服务
- **接口**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OfflineMessageService.java`
- **实现**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OfflineMessageServiceImpl.java`
### 3. 心跳检测调度器
- **文件**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/HeartbeatScheduler.java`
### 4. REST API 控制器
- **文件**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/OnlineStatusController.java`
### 5. 更新的文件
- `LiveChatHandler.java` - 集成在线状态和心跳检测
- `PrivateChatHandler.java` - 集成在线状态、离线消息和心跳检测
---
## 🔧 功能详解
### 1. 心跳检测机制 ✅
#### 工作原理
- **发送间隔**: 每30秒自动向所有活跃连接发送心跳消息
- **超时时间**: 90秒无响应则认为连接超时
- **检查频率**: 每60秒检查一次超时连接
- **自动清理**: 超时连接会被自动关闭并清理
#### 心跳消息格式
```json
{
"type": "heartbeat",
"timestamp": 1234567890123
}
```
#### 客户端响应格式
```json
{
"type": "heartbeat_response"
}
// 或
{
"type": "pong"
}
```
#### 关键方法
- `sendHeartbeat()` - 定时发送心跳(@Scheduled每30秒
- `checkTimeout()` - 定时检查超时(@Scheduled每60秒
- `recordHeartbeat(sessionId)` - 记录心跳时间
- `removeHeartbeat(sessionId)` - 移除心跳记录
---
### 2. 在线状态管理 ✅
#### Redis 数据结构
```
# 用户在线状态
user:online:{userId} = "1" (过期时间: 5分钟)
# 用户最后活跃时间
user:last_active:{userId} = timestamp (过期时间: 5分钟)
# 直播间在线用户集合
room:online:{roomId} = Set<userId> (过期时间: 1小时)
```
#### 核心功能
1. **设置用户在线/离线**
```java
onlineStatusService.setUserOnline(userId, true/false);
```
2. **检查用户是否在线**
```java
boolean online = onlineStatusService.isUserOnline(userId);
```
3. **更新用户活跃时间**
```java
onlineStatusService.updateUserLastActiveTime(userId);
```
4. **直播间在线管理**
```java
// 添加用户到房间
onlineStatusService.addUserToRoom(roomId, userId);
// 移除用户
onlineStatusService.removeUserFromRoom(roomId, userId);
// 获取在线人数
Long count = onlineStatusService.getRoomOnlineCount(roomId);
// 获取在线用户列表
Set<String> users = onlineStatusService.getRoomOnlineUsers(roomId);
```
5. **批量检查在线状态**
```java
List<Integer> onlineUsers = onlineStatusService.getOnlineUsers(userIds);
```
#### 自动过期机制
- 用户在线状态5分钟无活动自动过期
- 每次消息发送/接收会自动更新活跃时间
- 心跳消息也会更新活跃时间
---
### 3. 离线消息处理 ✅
#### Redis 数据结构
```
# 用户离线消息队列List
offline:msg:{userId} = [message1, message2, ...]
- 最大保存100条消息
- 过期时间: 7天
```
#### 核心功能
1. **保存离线消息**
```java
offlineMessageService.saveOfflineMessage(userId, messageJson);
```
2. **获取离线消息**
```java
// 获取指定数量
List<String> messages = offlineMessageService.getOfflineMessages(userId, 50);
// 获取全部
List<String> allMessages = offlineMessageService.getAllOfflineMessages(userId);
```
3. **清除离线消息**
```java
offlineMessageService.clearOfflineMessages(userId);
```
4. **获取离线消息数量**
```java
Long count = offlineMessageService.getOfflineMessageCount(userId);
```
#### 离线消息流程
1. 用户A发送消息给用户B
2. 检查用户B是否在线
3. 如果在线 → 直接推送
4. 如果离线 → 保存到Redis离线消息队列
5. 用户B上线时 → 自动推送所有离线消息
6. 推送完成后 → 清除离线消息
#### 消息格式
离线消息保存时会自动添加时间戳:
```json
{
"type": "chat",
"messageId": "123",
"userId": 456,
"username": "张三",
"message": "你好",
"timestamp": 1234567890123,
"savedAt": 1234567890456
}
```
---
## 🔌 REST API 接口
### 基础路径
```
http://localhost:8081/api/front/online
```
### 接口列表
#### 1. 检查用户是否在线
```http
GET /status/{userId}
```
**响应示例**:
```json
{
"code": 200,
"message": "success",
"data": {
"userId": 123,
"online": true,
"lastActiveTime": 1234567890123
}
}
```
#### 2. 批量检查用户在线状态
```http
POST /status/batch
Content-Type: application/json
[123, 456, 789]
```
**响应示例**:
```json
{
"code": 200,
"data": {
"total": 3,
"onlineCount": 2,
"onlineUsers": [123, 456]
}
}
```
#### 3. 获取直播间在线用户列表
```http
GET /room/{roomId}/users
```
**响应示例**:
```json
{
"code": 200,
"data": {
"roomId": "101",
"count": 150,
"users": ["user1", "user2", "user3"]
}
}
```
#### 4. 获取直播间在线人数
```http
GET /room/{roomId}/count
```
**响应示例**:
```json
{
"code": 200,
"data": {
"roomId": "101",
"count": 150
}
}
```
#### 5. 获取用户离线消息数量
```http
GET /offline/count/{userId}
```
**响应示例**:
```json
{
"code": 200,
"data": {
"userId": 123,
"count": 5
}
}
```
#### 6. 获取用户离线消息
```http
GET /offline/messages/{userId}?limit=50
```
**响应示例**:
```json
{
"code": 200,
"data": {
"userId": 123,
"messages": ["...", "..."],
"count": 5,
"totalCount": 5
}
}
```
#### 7. 清除用户离线消息
```http
DELETE /offline/messages/{userId}
```
#### 8. 获取WebSocket连接统计
```http
GET /stats
```
**响应示例**:
```json
{
"code": 200,
"data": {
"activeConnections": 256,
"timestamp": 1234567890123
}
}
```
---
## 🔄 WebSocket 连接流程
### 直播间聊天连接流程
#### 1. 建立连接
```
ws://localhost:8081/ws/live/chat/{roomId}?userId={userId}
```
#### 2. 连接成功后
- ✅ 添加到房间Session映射
- ✅ 设置用户在线状态
- ✅ 添加用户到房间在线列表
- ✅ 记录心跳时间
- ✅ 发送欢迎消息
- ✅ 广播用户加入消息
#### 3. 消息交互
```json
// 发送聊天消息
{
"type": "chat",
"userId": "123",
"nickname": "张三",
"content": "大家好"
}
// 接收心跳
{
"type": "heartbeat",
"timestamp": 1234567890123
}
// 响应心跳
{
"type": "heartbeat_response"
}
```
#### 4. 断开连接
- ✅ 从房间Session映射移除
- ✅ 从房间在线列表移除
- ✅ 移除心跳记录
- ✅ 广播用户离开消息
---
### 私聊连接流程
#### 1. 建立连接
```
ws://localhost:8081/ws/chat/{conversationId}?userId={userId}
```
#### 2. 连接成功后
- ✅ 验证用户权限
- ✅ 添加到会话Session映射
- ✅ 添加到用户Session映射
- ✅ 设置用户在线状态
- ✅ 记录心跳时间
-**推送离线消息**
- ✅ 标记会话为已读
#### 3. 消息交互
```json
// 发送聊天消息
{
"type": "chat",
"content": "你好",
"messageType": "text"
}
// 发送已读通知
{
"type": "read"
}
// 发送正在输入
{
"type": "typing"
}
// 响应心跳
{
"type": "heartbeat_response"
}
```
#### 4. 离线消息处理
- 发送消息时检查对方是否在线
- 在线 → 直接推送
- 离线 → 保存到Redis
- 对方上线时自动推送
#### 5. 断开连接
- ✅ 从会话Session映射移除
- ✅ 从用户Session映射移除
- ✅ 检查是否还有其他连接
- ✅ 无其他连接则设置离线
- ✅ 移除心跳记录
---
## 📊 性能优化
### Redis 使用优化
1. **Key过期策略**
- 在线状态: 5分钟自动过期
- 离线消息: 7天自动过期
- 房间在线列表: 1小时自动过期
2. **数据结构选择**
- 在线状态: String (简单快速)
- 离线消息: List (FIFO队列)
- 房间在线: Set (去重、快速查询)
3. **批量操作**
- 支持批量检查用户在线状态
- 减少Redis访问次数
### 内存优化
1. **离线消息限制**
- 每个用户最多保存100条离线消息
- 超过限制自动删除最旧的消息
2. **心跳记录**
- 使用ConcurrentHashMap存储
- 连接断开时自动清理
---
## 🧪 测试建议
### 1. 心跳检测测试
```javascript
// 客户端代码示例
const ws = new WebSocket('ws://localhost:8081/ws/live/chat/101?userId=123');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// 收到心跳,立即响应
if (data.type === 'heartbeat') {
ws.send(JSON.stringify({ type: 'heartbeat_response' }));
}
};
```
### 2. 在线状态测试
```bash
# 检查用户是否在线
curl http://localhost:8081/api/front/online/status/123
# 获取房间在线人数
curl http://localhost:8081/api/front/online/room/101/count
```
### 3. 离线消息测试
1. 用户A连接WebSocket
2. 用户B断开连接离线
3. 用户A发送消息给用户B
4. 检查离线消息数量
5. 用户B重新连接
6. 验证是否收到离线消息
---
## ⚠️ 注意事项
### 1. Redis 依赖
- 确保Redis服务正常运行
- 检查Redis配置application.yml
- 建议使用Redis 5.0+
### 2. 定时任务
- 心跳检测使用Spring @Scheduled
- 确保启动类有 @EnableScheduling 注解
- 可以通过配置调整心跳间隔
### 3. 并发安全
- 使用ConcurrentHashMap和CopyOnWriteArraySet
- 所有Session操作都是线程安全的
- Redis操作通过RedisUtil统一管理
### 4. 异常处理
- 所有关键操作都有try-catch
- 异常不会影响其他用户
- 详细的日志记录便于排查问题
---
## 🔧 配置说明
### 心跳配置(可在代码中调整)
```java
// HeartbeatScheduler.java
private static final long HEARTBEAT_TIMEOUT = 90000; // 90秒超时
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒发送间隔
```
### 在线状态配置
```java
// OnlineStatusServiceImpl.java
private static final long ONLINE_EXPIRE_SECONDS = 300; // 5分钟过期
```
### 离线消息配置
```java
// OfflineMessageServiceImpl.java
private static final int MAX_OFFLINE_MESSAGES = 100; // 最多100条
private static final long OFFLINE_MESSAGE_EXPIRE_SECONDS = 7 * 24 * 3600; // 7天
```
---
## 📈 监控建议
### 1. 关键指标
- WebSocket活跃连接数
- 在线用户数
- 离线消息堆积数量
- 心跳超时次数
### 2. 日志监控
```bash
# 查看心跳日志
grep "Heartbeat" application.log
# 查看在线状态日志
grep "OnlineStatus" application.log
# 查看离线消息日志
grep "OfflineMessage" application.log
```
### 3. Redis 监控
```bash
# 查看在线用户数
redis-cli KEYS "user:online:*" | wc -l
# 查看离线消息总数
redis-cli KEYS "offline:msg:*" | wc -l
# 查看房间在线列表
redis-cli SMEMBERS "room:online:101"
```
---
## ✅ 完成度总结
| 功能模块 | 完成度 | 说明 |
|---------|--------|------|
| 心跳检测 | ✅ 100% | 完全实现,包括发送、检测、超时清理 |
| 在线状态管理 | ✅ 100% | 完全实现,支持用户和房间在线状态 |
| 离线消息处理 | ✅ 100% | 完全实现,自动保存和推送 |
| REST API | ✅ 100% | 提供完整的查询接口 |
| 集成到Handler | ✅ 100% | LiveChatHandler和PrivateChatHandler已集成 |
---
## 🎉 总结
现在你的WebSocket系统已经具备了完整的企业级功能
1.**稳定性**: 心跳检测确保连接健康
2.**实时性**: 在线状态实时更新
3.**可靠性**: 离线消息不丢失
4.**可监控**: 提供完整的REST API
5.**高性能**: Redis缓存自动过期
6.**易维护**: 清晰的日志和异常处理
所有功能都已经集成到现有的WebSocket Handler中无需修改客户端代码即可享受这些增强功能