修复语音+视频通话bug

This commit is contained in:
xiao12feng8 2025-12-26 15:59:19 +08:00
parent bae216261a
commit 714143264a
2 changed files with 408 additions and 2 deletions

View File

@ -4,7 +4,7 @@ import com.github.pagehelper.PageInfo;
import com.zbkj.common.model.call.CallRecord; import com.zbkj.common.model.call.CallRecord;
import com.zbkj.common.model.user.User; import com.zbkj.common.model.user.User;
import com.zbkj.common.request.PageParamRequest; import com.zbkj.common.request.PageParamRequest;
import com.zbkj.common.response.CommonResult; import com.zbkj.common.result.CommonResult;
import com.zbkj.front.request.call.InitiateCallRequest; import com.zbkj.front.request.call.InitiateCallRequest;
import com.zbkj.front.response.call.CallRecordResponse; import com.zbkj.front.response.call.CallRecordResponse;
import com.zbkj.front.response.call.InitiateCallResponse; import com.zbkj.front.response.call.InitiateCallResponse;

View File

@ -23,20 +23,426 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
/**
* 通话信令WebSocket处理器
* 处理WebRTC信令交换offer, answer, ice-candidate
*/
@Component @Component
public class CallSignalingHandler extends TextWebSocketHandler { public class CallSignalingHandler extends TextWebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(CallSignalingHandler.class); private static final Logger logger = LoggerFactory.getLogger(CallSignalingHandler.class);
private final ObjectMapper objectMapper = new ObjectMapper(); private final ObjectMapper objectMapper = new ObjectMapper();
private static final long CALL_TIMEOUT = 60 * 1000; private static final long CALL_TIMEOUT = 60 * 1000; // 60秒超时
@Autowired @Autowired
private CallService callService; private CallService callService;
@Autowired @Autowired
private UserService userService; private UserService userService;
// callId -> sessions
private final Map<String, Set<WebSocketSession>> callSessions = new ConcurrentHashMap<>(); private final Map<String, Set<WebSocketSession>> callSessions = new ConcurrentHashMap<>();
// userId -> session
private final Map<Integer, WebSocketSession> userCallSessions = new ConcurrentHashMap<>(); private final Map<Integer, WebSocketSession> userCallSessions = new ConcurrentHashMap<>();
// sessionId -> userId
private final Map<String, Integer> sessionUserMap = new ConcurrentHashMap<>(); private final Map<String, Integer> sessionUserMap = new ConcurrentHashMap<>();
// sessionId -> callId
private final Map<String, String> sessionCallMap = new ConcurrentHashMap<>(); private final Map<String, String> sessionCallMap = new ConcurrentHashMap<>();
// callId -> createTime
private final Map<String, Long> callCreateTime = new ConcurrentHashMap<>(); private final Map<String, Long> callCreateTime = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.info("[CallSignaling] 连接建立: sessionId={}", session.getId());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
try {
JsonNode json = objectMapper.readTree(message.getPayload());
String type = json.has("type") ? json.get("type").asText() : "";
switch (type) {
case "register":
handleRegister(session, json);
break;
case "call_request":
handleCallRequest(session, json);
break;
case "call_accept":
handleCallAccept(session, json);
break;
case "call_reject":
handleCallReject(session, json);
break;
case "call_cancel":
handleCallCancel(session, json);
break;
case "call_end":
handleCallEnd(session, json);
break;
case "offer":
case "answer":
case "ice-candidate":
handleSignaling(session, json, type);
break;
default:
logger.warn("[CallSignaling] 未知消息类型: {}", type);
}
} catch (Exception e) {
logger.error("[CallSignaling] 处理消息异常", e);
sendError(session, "消息处理失败: " + e.getMessage());
}
}
private void handleRegister(WebSocketSession session, JsonNode json) throws IOException {
Integer userId = json.has("userId") ? json.get("userId").asInt() : null;
if (userId == null) {
sendError(session, "userId不能为空");
return;
}
// 关闭旧连接
WebSocketSession oldSession = userCallSessions.get(userId);
if (oldSession != null && oldSession.isOpen() && !oldSession.getId().equals(session.getId())) {
try {
oldSession.close();
} catch (Exception ignored) {}
}
userCallSessions.put(userId, session);
sessionUserMap.put(session.getId(), userId);
ObjectNode response = objectMapper.createObjectNode();
response.put("type", "registered");
response.put("userId", userId);
session.sendMessage(new TextMessage(response.toString()));
logger.info("[CallSignaling] 用户注册: userId={}", userId);
}
private void handleCallRequest(WebSocketSession session, JsonNode json) throws IOException {
Integer callerId = sessionUserMap.get(session.getId());
if (callerId == null) {
sendError(session, "请先注册");
return;
}
Integer calleeId = json.has("calleeId") ? json.get("calleeId").asInt() : null;
String callType = json.has("callType") ? json.get("callType").asText() : "voice";
if (calleeId == null) {
sendError(session, "calleeId不能为空");
return;
}
try {
// 创建通话记录
CallRecord record = callService.createCall(callerId, calleeId, callType);
String callId = record.getCallId();
// 记录通话创建时间
callCreateTime.put(callId, System.currentTimeMillis());
// 加入通话会话
joinCallSession(callId, session);
sessionCallMap.put(session.getId(), callId);
// 获取用户信息
User caller = userService.getById(callerId);
// 通知被叫方
WebSocketSession calleeSession = userCallSessions.get(calleeId);
if (calleeSession != null && calleeSession.isOpen()) {
ObjectNode incoming = objectMapper.createObjectNode();
incoming.put("type", "incoming_call");
incoming.put("callId", callId);
incoming.put("callerId", callerId);
incoming.put("callerName", caller != null ? caller.getNickname() : "用户" + callerId);
incoming.put("callerAvatar", caller != null ? caller.getAvatar() : "");
incoming.put("callType", callType);
calleeSession.sendMessage(new TextMessage(incoming.toString()));
}
// 响应主叫方
ObjectNode response = objectMapper.createObjectNode();
response.put("type", "call_created");
response.put("callId", callId);
session.sendMessage(new TextMessage(response.toString()));
logger.info("[CallSignaling] 发起通话: callId={}, caller={}, callee={}", callId, callerId, calleeId);
} catch (Exception e) {
sendError(session, e.getMessage());
}
}
private void handleCallAccept(WebSocketSession session, JsonNode json) throws IOException {
Integer userId = sessionUserMap.get(session.getId());
String callId = json.has("callId") ? json.get("callId").asText() : null;
if (userId == null || callId == null) {
sendError(session, "参数不完整");
return;
}
try {
callService.acceptCall(callId, userId);
joinCallSession(callId, session);
sessionCallMap.put(session.getId(), callId);
// 通知主叫方
CallRecord record = callService.getByCallId(callId);
if (record != null) {
WebSocketSession callerSession = userCallSessions.get(record.getCallerId());
if (callerSession != null && callerSession.isOpen()) {
ObjectNode notify = objectMapper.createObjectNode();
notify.put("type", "call_accepted");
notify.put("callId", callId);
callerSession.sendMessage(new TextMessage(notify.toString()));
}
}
logger.info("[CallSignaling] 接听通话: callId={}, userId={}", callId, userId);
} catch (Exception e) {
sendError(session, e.getMessage());
}
}
private void handleCallReject(WebSocketSession session, JsonNode json) throws IOException {
Integer userId = sessionUserMap.get(session.getId());
String callId = json.has("callId") ? json.get("callId").asText() : null;
if (userId == null || callId == null) {
sendError(session, "参数不完整");
return;
}
try {
CallRecord record = callService.getByCallId(callId);
callService.rejectCall(callId, userId);
// 通知主叫方
if (record != null) {
WebSocketSession callerSession = userCallSessions.get(record.getCallerId());
if (callerSession != null && callerSession.isOpen()) {
ObjectNode notify = objectMapper.createObjectNode();
notify.put("type", "call_rejected");
notify.put("callId", callId);
callerSession.sendMessage(new TextMessage(notify.toString()));
}
}
cleanupCall(callId);
logger.info("[CallSignaling] 拒绝通话: callId={}, userId={}", callId, userId);
} catch (Exception e) {
sendError(session, e.getMessage());
}
}
private void handleCallCancel(WebSocketSession session, JsonNode json) throws IOException {
Integer userId = sessionUserMap.get(session.getId());
String callId = json.has("callId") ? json.get("callId").asText() : null;
if (userId == null || callId == null) {
sendError(session, "参数不完整");
return;
}
try {
CallRecord record = callService.getByCallId(callId);
callService.cancelCall(callId, userId);
// 通知被叫方
if (record != null) {
WebSocketSession calleeSession = userCallSessions.get(record.getCalleeId());
if (calleeSession != null && calleeSession.isOpen()) {
ObjectNode notify = objectMapper.createObjectNode();
notify.put("type", "call_cancelled");
notify.put("callId", callId);
calleeSession.sendMessage(new TextMessage(notify.toString()));
}
}
cleanupCall(callId);
logger.info("[CallSignaling] 取消通话: callId={}, userId={}", callId, userId);
} catch (Exception e) {
sendError(session, e.getMessage());
}
}
private void handleCallEnd(WebSocketSession session, JsonNode json) throws IOException {
Integer userId = sessionUserMap.get(session.getId());
String callId = json.has("callId") ? json.get("callId").asText() : sessionCallMap.get(session.getId());
String reason = json.has("reason") ? json.get("reason").asText() : "normal";
if (userId == null || callId == null) {
sendError(session, "参数不完整");
return;
}
try {
CallRecord record = callService.getByCallId(callId);
callService.endCall(callId, userId, reason);
// 通知对方
if (record != null) {
Integer otherUserId = record.getCallerId().equals(userId) ? record.getCalleeId() : record.getCallerId();
WebSocketSession otherSession = userCallSessions.get(otherUserId);
if (otherSession != null && otherSession.isOpen()) {
ObjectNode notify = objectMapper.createObjectNode();
notify.put("type", "call_ended");
notify.put("callId", callId);
notify.put("reason", reason);
otherSession.sendMessage(new TextMessage(notify.toString()));
}
}
cleanupCall(callId);
logger.info("[CallSignaling] 结束通话: callId={}, userId={}, reason={}", callId, userId, reason);
} catch (Exception e) {
sendError(session, e.getMessage());
}
}
private void handleSignaling(WebSocketSession session, JsonNode json, String type) throws IOException {
String callId = json.has("callId") ? json.get("callId").asText() : sessionCallMap.get(session.getId());
if (callId == null) {
sendError(session, "callId不能为空");
return;
}
Set<WebSocketSession> sessions = callSessions.get(callId);
if (sessions == null) {
sendError(session, "通话不存在");
return;
}
// 转发信令给通话中的其他参与者
ObjectNode forward = objectMapper.createObjectNode();
forward.put("type", type);
forward.put("callId", callId);
if (json.has("sdp")) {
forward.set("sdp", json.get("sdp"));
}
if (json.has("candidate")) {
forward.set("candidate", json.get("candidate"));
}
String forwardMsg = forward.toString();
for (WebSocketSession s : sessions) {
if (s.isOpen() && !s.getId().equals(session.getId())) {
s.sendMessage(new TextMessage(forwardMsg));
}
}
logger.debug("[CallSignaling] 转发信令: type={}, callId={}", type, callId);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
Integer userId = sessionUserMap.remove(session.getId());
String callId = sessionCallMap.remove(session.getId());
if (userId != null) {
userCallSessions.remove(userId);
}
if (callId != null) {
Set<WebSocketSession> sessions = callSessions.get(callId);
if (sessions != null) {
sessions.remove(session);
}
// 如果用户在通话中断开结束通话
CallRecord record = callService.getByCallId(callId);
if (record != null && isActiveStatus(record.getStatus())) {
callService.endCall(callId, userId, "disconnect");
// 通知对方
Integer otherUserId = record.getCallerId().equals(userId) ? record.getCalleeId() : record.getCallerId();
WebSocketSession otherSession = userCallSessions.get(otherUserId);
if (otherSession != null && otherSession.isOpen()) {
ObjectNode notify = objectMapper.createObjectNode();
notify.put("type", "call_ended");
notify.put("callId", callId);
notify.put("reason", "peer_disconnect");
otherSession.sendMessage(new TextMessage(notify.toString()));
}
}
}
logger.info("[CallSignaling] 连接关闭: sessionId={}, userId={}", session.getId(), userId);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
logger.error("[CallSignaling] 传输错误: sessionId={}", session.getId(), exception);
session.close(CloseStatus.SERVER_ERROR);
}
/**
* 定时检查超时的通话
*/
@Scheduled(fixedRate = 10000)
public void checkCallTimeout() {
long now = System.currentTimeMillis();
callCreateTime.forEach((callId, createTime) -> {
if (now - createTime > CALL_TIMEOUT) {
try {
CallRecord record = callService.getByCallId(callId);
if (record != null && ("calling".equals(record.getStatus()) || "ringing".equals(record.getStatus()))) {
callService.missedCall(callId);
// 通知双方
notifyCallTimeout(record);
cleanupCall(callId);
logger.info("[CallSignaling] 通话超时: callId={}", callId);
}
} catch (Exception e) {
logger.error("[CallSignaling] 处理超时异常: callId={}", callId, e);
}
}
});
}
private void notifyCallTimeout(CallRecord record) {
try {
ObjectNode notify = objectMapper.createObjectNode();
notify.put("type", "call_timeout");
notify.put("callId", record.getCallId());
WebSocketSession callerSession = userCallSessions.get(record.getCallerId());
if (callerSession != null && callerSession.isOpen()) {
callerSession.sendMessage(new TextMessage(notify.toString()));
}
WebSocketSession calleeSession = userCallSessions.get(record.getCalleeId());
if (calleeSession != null && calleeSession.isOpen()) {
calleeSession.sendMessage(new TextMessage(notify.toString()));
}
} catch (Exception e) {
logger.error("[CallSignaling] 通知超时异常", e);
}
}
private void joinCallSession(String callId, WebSocketSession session) {
callSessions.computeIfAbsent(callId, k -> new CopyOnWriteArraySet<>()).add(session);
}
private void cleanupCall(String callId) {
callSessions.remove(callId);
callCreateTime.remove(callId);
sessionCallMap.values().removeIf(v -> v.equals(callId));
}
private boolean isActiveStatus(String status) {
return "calling".equals(status) || "ringing".equals(status) || "connected".equals(status);
}
private void sendError(WebSocketSession session, String message) throws IOException {
ObjectNode error = objectMapper.createObjectNode();
error.put("type", "error");
error.put("message", message);
session.sendMessage(new TextMessage(error.toString()));
}
}