590 lines
12 KiB
Markdown
590 lines
12 KiB
Markdown
# WebSocket 增强功能实现说明
|
||
|
||
## 📋 概述
|
||
|
||
本文档说明了为直播IM系统新增的三个核心功能模块:
|
||
1. **心跳检测机制** - 保持连接活跃,自动清理超时连接
|
||
2. **在线状态管理** - 实时追踪用户在线状态和活跃时间
|
||
3. **离线消息处理** - 保存和推送离线消息
|
||
|
||
---
|
||
|
||
## 🎯 新增文件列表
|
||
|
||
### 1. 在线状态管理服务
|
||
- **接口**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OnlineStatusService.java`
|
||
- **实现**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OnlineStatusServiceImpl.java`
|
||
|
||
### 2. 离线消息管理服务
|
||
- **接口**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/OfflineMessageService.java`
|
||
- **实现**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/service/impl/OfflineMessageServiceImpl.java`
|
||
|
||
### 3. 心跳检测调度器
|
||
- **文件**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/HeartbeatScheduler.java`
|
||
|
||
### 4. REST API 控制器
|
||
- **文件**: `Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/OnlineStatusController.java`
|
||
|
||
### 5. 更新的文件
|
||
- `LiveChatHandler.java` - 集成在线状态和心跳检测
|
||
- `PrivateChatHandler.java` - 集成在线状态、离线消息和心跳检测
|
||
|
||
---
|
||
|
||
## 🔧 功能详解
|
||
|
||
### 1. 心跳检测机制 ✅
|
||
|
||
#### 工作原理
|
||
- **发送间隔**: 每30秒自动向所有活跃连接发送心跳消息
|
||
- **超时时间**: 90秒无响应则认为连接超时
|
||
- **检查频率**: 每60秒检查一次超时连接
|
||
- **自动清理**: 超时连接会被自动关闭并清理
|
||
|
||
#### 心跳消息格式
|
||
```json
|
||
{
|
||
"type": "heartbeat",
|
||
"timestamp": 1234567890123
|
||
}
|
||
```
|
||
|
||
#### 客户端响应格式
|
||
```json
|
||
{
|
||
"type": "heartbeat_response"
|
||
}
|
||
// 或
|
||
{
|
||
"type": "pong"
|
||
}
|
||
```
|
||
|
||
#### 关键方法
|
||
- `sendHeartbeat()` - 定时发送心跳(@Scheduled,每30秒)
|
||
- `checkTimeout()` - 定时检查超时(@Scheduled,每60秒)
|
||
- `recordHeartbeat(sessionId)` - 记录心跳时间
|
||
- `removeHeartbeat(sessionId)` - 移除心跳记录
|
||
|
||
---
|
||
|
||
### 2. 在线状态管理 ✅
|
||
|
||
#### Redis 数据结构
|
||
```
|
||
# 用户在线状态
|
||
user:online:{userId} = "1" (过期时间: 5分钟)
|
||
|
||
# 用户最后活跃时间
|
||
user:last_active:{userId} = timestamp (过期时间: 5分钟)
|
||
|
||
# 直播间在线用户集合
|
||
room:online:{roomId} = Set<userId> (过期时间: 1小时)
|
||
```
|
||
|
||
#### 核心功能
|
||
1. **设置用户在线/离线**
|
||
```java
|
||
onlineStatusService.setUserOnline(userId, true/false);
|
||
```
|
||
|
||
2. **检查用户是否在线**
|
||
```java
|
||
boolean online = onlineStatusService.isUserOnline(userId);
|
||
```
|
||
|
||
3. **更新用户活跃时间**
|
||
```java
|
||
onlineStatusService.updateUserLastActiveTime(userId);
|
||
```
|
||
|
||
4. **直播间在线管理**
|
||
```java
|
||
// 添加用户到房间
|
||
onlineStatusService.addUserToRoom(roomId, userId);
|
||
|
||
// 移除用户
|
||
onlineStatusService.removeUserFromRoom(roomId, userId);
|
||
|
||
// 获取在线人数
|
||
Long count = onlineStatusService.getRoomOnlineCount(roomId);
|
||
|
||
// 获取在线用户列表
|
||
Set<String> users = onlineStatusService.getRoomOnlineUsers(roomId);
|
||
```
|
||
|
||
5. **批量检查在线状态**
|
||
```java
|
||
List<Integer> onlineUsers = onlineStatusService.getOnlineUsers(userIds);
|
||
```
|
||
|
||
#### 自动过期机制
|
||
- 用户在线状态5分钟无活动自动过期
|
||
- 每次消息发送/接收会自动更新活跃时间
|
||
- 心跳消息也会更新活跃时间
|
||
|
||
---
|
||
|
||
### 3. 离线消息处理 ✅
|
||
|
||
#### Redis 数据结构
|
||
```
|
||
# 用户离线消息队列(List)
|
||
offline:msg:{userId} = [message1, message2, ...]
|
||
- 最大保存100条消息
|
||
- 过期时间: 7天
|
||
```
|
||
|
||
#### 核心功能
|
||
1. **保存离线消息**
|
||
```java
|
||
offlineMessageService.saveOfflineMessage(userId, messageJson);
|
||
```
|
||
|
||
2. **获取离线消息**
|
||
```java
|
||
// 获取指定数量
|
||
List<String> messages = offlineMessageService.getOfflineMessages(userId, 50);
|
||
|
||
// 获取全部
|
||
List<String> allMessages = offlineMessageService.getAllOfflineMessages(userId);
|
||
```
|
||
|
||
3. **清除离线消息**
|
||
```java
|
||
offlineMessageService.clearOfflineMessages(userId);
|
||
```
|
||
|
||
4. **获取离线消息数量**
|
||
```java
|
||
Long count = offlineMessageService.getOfflineMessageCount(userId);
|
||
```
|
||
|
||
#### 离线消息流程
|
||
1. 用户A发送消息给用户B
|
||
2. 检查用户B是否在线
|
||
3. 如果在线 → 直接推送
|
||
4. 如果离线 → 保存到Redis离线消息队列
|
||
5. 用户B上线时 → 自动推送所有离线消息
|
||
6. 推送完成后 → 清除离线消息
|
||
|
||
#### 消息格式
|
||
离线消息保存时会自动添加时间戳:
|
||
```json
|
||
{
|
||
"type": "chat",
|
||
"messageId": "123",
|
||
"userId": 456,
|
||
"username": "张三",
|
||
"message": "你好",
|
||
"timestamp": 1234567890123,
|
||
"savedAt": 1234567890456
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 🔌 REST API 接口
|
||
|
||
### 基础路径
|
||
```
|
||
http://localhost:8081/api/front/online
|
||
```
|
||
|
||
### 接口列表
|
||
|
||
#### 1. 检查用户是否在线
|
||
```http
|
||
GET /status/{userId}
|
||
```
|
||
**响应示例**:
|
||
```json
|
||
{
|
||
"code": 200,
|
||
"message": "success",
|
||
"data": {
|
||
"userId": 123,
|
||
"online": true,
|
||
"lastActiveTime": 1234567890123
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 2. 批量检查用户在线状态
|
||
```http
|
||
POST /status/batch
|
||
Content-Type: application/json
|
||
|
||
[123, 456, 789]
|
||
```
|
||
**响应示例**:
|
||
```json
|
||
{
|
||
"code": 200,
|
||
"data": {
|
||
"total": 3,
|
||
"onlineCount": 2,
|
||
"onlineUsers": [123, 456]
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3. 获取直播间在线用户列表
|
||
```http
|
||
GET /room/{roomId}/users
|
||
```
|
||
**响应示例**:
|
||
```json
|
||
{
|
||
"code": 200,
|
||
"data": {
|
||
"roomId": "101",
|
||
"count": 150,
|
||
"users": ["user1", "user2", "user3"]
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 4. 获取直播间在线人数
|
||
```http
|
||
GET /room/{roomId}/count
|
||
```
|
||
**响应示例**:
|
||
```json
|
||
{
|
||
"code": 200,
|
||
"data": {
|
||
"roomId": "101",
|
||
"count": 150
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 5. 获取用户离线消息数量
|
||
```http
|
||
GET /offline/count/{userId}
|
||
```
|
||
**响应示例**:
|
||
```json
|
||
{
|
||
"code": 200,
|
||
"data": {
|
||
"userId": 123,
|
||
"count": 5
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 6. 获取用户离线消息
|
||
```http
|
||
GET /offline/messages/{userId}?limit=50
|
||
```
|
||
**响应示例**:
|
||
```json
|
||
{
|
||
"code": 200,
|
||
"data": {
|
||
"userId": 123,
|
||
"messages": ["...", "..."],
|
||
"count": 5,
|
||
"totalCount": 5
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 7. 清除用户离线消息
|
||
```http
|
||
DELETE /offline/messages/{userId}
|
||
```
|
||
|
||
#### 8. 获取WebSocket连接统计
|
||
```http
|
||
GET /stats
|
||
```
|
||
**响应示例**:
|
||
```json
|
||
{
|
||
"code": 200,
|
||
"data": {
|
||
"activeConnections": 256,
|
||
"timestamp": 1234567890123
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 🔄 WebSocket 连接流程
|
||
|
||
### 直播间聊天连接流程
|
||
|
||
#### 1. 建立连接
|
||
```
|
||
ws://localhost:8081/ws/live/chat/{roomId}?userId={userId}
|
||
```
|
||
|
||
#### 2. 连接成功后
|
||
- ✅ 添加到房间Session映射
|
||
- ✅ 设置用户在线状态
|
||
- ✅ 添加用户到房间在线列表
|
||
- ✅ 记录心跳时间
|
||
- ✅ 发送欢迎消息
|
||
- ✅ 广播用户加入消息
|
||
|
||
#### 3. 消息交互
|
||
```json
|
||
// 发送聊天消息
|
||
{
|
||
"type": "chat",
|
||
"userId": "123",
|
||
"nickname": "张三",
|
||
"content": "大家好"
|
||
}
|
||
|
||
// 接收心跳
|
||
{
|
||
"type": "heartbeat",
|
||
"timestamp": 1234567890123
|
||
}
|
||
|
||
// 响应心跳
|
||
{
|
||
"type": "heartbeat_response"
|
||
}
|
||
```
|
||
|
||
#### 4. 断开连接
|
||
- ✅ 从房间Session映射移除
|
||
- ✅ 从房间在线列表移除
|
||
- ✅ 移除心跳记录
|
||
- ✅ 广播用户离开消息
|
||
|
||
---
|
||
|
||
### 私聊连接流程
|
||
|
||
#### 1. 建立连接
|
||
```
|
||
ws://localhost:8081/ws/chat/{conversationId}?userId={userId}
|
||
```
|
||
|
||
#### 2. 连接成功后
|
||
- ✅ 验证用户权限
|
||
- ✅ 添加到会话Session映射
|
||
- ✅ 添加到用户Session映射
|
||
- ✅ 设置用户在线状态
|
||
- ✅ 记录心跳时间
|
||
- ✅ **推送离线消息**
|
||
- ✅ 标记会话为已读
|
||
|
||
#### 3. 消息交互
|
||
```json
|
||
// 发送聊天消息
|
||
{
|
||
"type": "chat",
|
||
"content": "你好",
|
||
"messageType": "text"
|
||
}
|
||
|
||
// 发送已读通知
|
||
{
|
||
"type": "read"
|
||
}
|
||
|
||
// 发送正在输入
|
||
{
|
||
"type": "typing"
|
||
}
|
||
|
||
// 响应心跳
|
||
{
|
||
"type": "heartbeat_response"
|
||
}
|
||
```
|
||
|
||
#### 4. 离线消息处理
|
||
- 发送消息时检查对方是否在线
|
||
- 在线 → 直接推送
|
||
- 离线 → 保存到Redis
|
||
- 对方上线时自动推送
|
||
|
||
#### 5. 断开连接
|
||
- ✅ 从会话Session映射移除
|
||
- ✅ 从用户Session映射移除
|
||
- ✅ 检查是否还有其他连接
|
||
- ✅ 无其他连接则设置离线
|
||
- ✅ 移除心跳记录
|
||
|
||
---
|
||
|
||
## 📊 性能优化
|
||
|
||
### Redis 使用优化
|
||
1. **Key过期策略**
|
||
- 在线状态: 5分钟自动过期
|
||
- 离线消息: 7天自动过期
|
||
- 房间在线列表: 1小时自动过期
|
||
|
||
2. **数据结构选择**
|
||
- 在线状态: String (简单快速)
|
||
- 离线消息: List (FIFO队列)
|
||
- 房间在线: Set (去重、快速查询)
|
||
|
||
3. **批量操作**
|
||
- 支持批量检查用户在线状态
|
||
- 减少Redis访问次数
|
||
|
||
### 内存优化
|
||
1. **离线消息限制**
|
||
- 每个用户最多保存100条离线消息
|
||
- 超过限制自动删除最旧的消息
|
||
|
||
2. **心跳记录**
|
||
- 使用ConcurrentHashMap存储
|
||
- 连接断开时自动清理
|
||
|
||
---
|
||
|
||
## 🧪 测试建议
|
||
|
||
### 1. 心跳检测测试
|
||
```javascript
|
||
// 客户端代码示例
|
||
const ws = new WebSocket('ws://localhost:8081/ws/live/chat/101?userId=123');
|
||
|
||
ws.onmessage = (event) => {
|
||
const data = JSON.parse(event.data);
|
||
|
||
// 收到心跳,立即响应
|
||
if (data.type === 'heartbeat') {
|
||
ws.send(JSON.stringify({ type: 'heartbeat_response' }));
|
||
}
|
||
};
|
||
```
|
||
|
||
### 2. 在线状态测试
|
||
```bash
|
||
# 检查用户是否在线
|
||
curl http://localhost:8081/api/front/online/status/123
|
||
|
||
# 获取房间在线人数
|
||
curl http://localhost:8081/api/front/online/room/101/count
|
||
```
|
||
|
||
### 3. 离线消息测试
|
||
1. 用户A连接WebSocket
|
||
2. 用户B断开连接(离线)
|
||
3. 用户A发送消息给用户B
|
||
4. 检查离线消息数量
|
||
5. 用户B重新连接
|
||
6. 验证是否收到离线消息
|
||
|
||
---
|
||
|
||
## ⚠️ 注意事项
|
||
|
||
### 1. Redis 依赖
|
||
- 确保Redis服务正常运行
|
||
- 检查Redis配置(application.yml)
|
||
- 建议使用Redis 5.0+
|
||
|
||
### 2. 定时任务
|
||
- 心跳检测使用Spring @Scheduled
|
||
- 确保启动类有 @EnableScheduling 注解
|
||
- 可以通过配置调整心跳间隔
|
||
|
||
### 3. 并发安全
|
||
- 使用ConcurrentHashMap和CopyOnWriteArraySet
|
||
- 所有Session操作都是线程安全的
|
||
- Redis操作通过RedisUtil统一管理
|
||
|
||
### 4. 异常处理
|
||
- 所有关键操作都有try-catch
|
||
- 异常不会影响其他用户
|
||
- 详细的日志记录便于排查问题
|
||
|
||
---
|
||
|
||
## 🔧 配置说明
|
||
|
||
### 心跳配置(可在代码中调整)
|
||
```java
|
||
// HeartbeatScheduler.java
|
||
private static final long HEARTBEAT_TIMEOUT = 90000; // 90秒超时
|
||
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒发送间隔
|
||
```
|
||
|
||
### 在线状态配置
|
||
```java
|
||
// OnlineStatusServiceImpl.java
|
||
private static final long ONLINE_EXPIRE_SECONDS = 300; // 5分钟过期
|
||
```
|
||
|
||
### 离线消息配置
|
||
```java
|
||
// OfflineMessageServiceImpl.java
|
||
private static final int MAX_OFFLINE_MESSAGES = 100; // 最多100条
|
||
private static final long OFFLINE_MESSAGE_EXPIRE_SECONDS = 7 * 24 * 3600; // 7天
|
||
```
|
||
|
||
---
|
||
|
||
## 📈 监控建议
|
||
|
||
### 1. 关键指标
|
||
- WebSocket活跃连接数
|
||
- 在线用户数
|
||
- 离线消息堆积数量
|
||
- 心跳超时次数
|
||
|
||
### 2. 日志监控
|
||
```bash
|
||
# 查看心跳日志
|
||
grep "Heartbeat" application.log
|
||
|
||
# 查看在线状态日志
|
||
grep "OnlineStatus" application.log
|
||
|
||
# 查看离线消息日志
|
||
grep "OfflineMessage" application.log
|
||
```
|
||
|
||
### 3. Redis 监控
|
||
```bash
|
||
# 查看在线用户数
|
||
redis-cli KEYS "user:online:*" | wc -l
|
||
|
||
# 查看离线消息总数
|
||
redis-cli KEYS "offline:msg:*" | wc -l
|
||
|
||
# 查看房间在线列表
|
||
redis-cli SMEMBERS "room:online:101"
|
||
```
|
||
|
||
---
|
||
|
||
## ✅ 完成度总结
|
||
|
||
| 功能模块 | 完成度 | 说明 |
|
||
|---------|--------|------|
|
||
| 心跳检测 | ✅ 100% | 完全实现,包括发送、检测、超时清理 |
|
||
| 在线状态管理 | ✅ 100% | 完全实现,支持用户和房间在线状态 |
|
||
| 离线消息处理 | ✅ 100% | 完全实现,自动保存和推送 |
|
||
| REST API | ✅ 100% | 提供完整的查询接口 |
|
||
| 集成到Handler | ✅ 100% | LiveChatHandler和PrivateChatHandler已集成 |
|
||
|
||
---
|
||
|
||
## 🎉 总结
|
||
|
||
现在你的WebSocket系统已经具备了完整的企业级功能:
|
||
|
||
1. ✅ **稳定性**: 心跳检测确保连接健康
|
||
2. ✅ **实时性**: 在线状态实时更新
|
||
3. ✅ **可靠性**: 离线消息不丢失
|
||
4. ✅ **可监控**: 提供完整的REST API
|
||
5. ✅ **高性能**: Redis缓存,自动过期
|
||
6. ✅ **易维护**: 清晰的日志和异常处理
|
||
|
||
所有功能都已经集成到现有的WebSocket Handler中,无需修改客户端代码即可享受这些增强功能!
|