diff --git a/Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/CallController.java b/Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/CallController.java index fc8c3588..0f4d9bd3 100644 --- a/Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/CallController.java +++ b/Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/controller/CallController.java @@ -4,7 +4,7 @@ import com.github.pagehelper.PageInfo; import com.zbkj.common.model.call.CallRecord; import com.zbkj.common.model.user.User; import com.zbkj.common.request.PageParamRequest; -import com.zbkj.common.response.CommonResult; +import com.zbkj.common.result.CommonResult; import com.zbkj.front.request.call.InitiateCallRequest; import com.zbkj.front.response.call.CallRecordResponse; import com.zbkj.front.response.call.InitiateCallResponse; diff --git a/Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/CallSignalingHandler.java b/Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/CallSignalingHandler.java index bcf9e4bd..64500e0e 100644 --- a/Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/CallSignalingHandler.java +++ b/Zhibo/zhibo-h/crmeb-front/src/main/java/com/zbkj/front/websocket/CallSignalingHandler.java @@ -23,20 +23,426 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +/** + * 通话信令WebSocket处理器 + * 处理WebRTC信令交换:offer, answer, ice-candidate + */ @Component public class CallSignalingHandler extends TextWebSocketHandler { private static final Logger logger = LoggerFactory.getLogger(CallSignalingHandler.class); private final ObjectMapper objectMapper = new ObjectMapper(); - private static final long CALL_TIMEOUT = 60 * 1000; + private static final long CALL_TIMEOUT = 60 * 1000; // 60秒超时 @Autowired private CallService callService; + @Autowired private UserService userService; + // callId -> sessions private final Map> callSessions = new ConcurrentHashMap<>(); + // userId -> session private final Map userCallSessions = new ConcurrentHashMap<>(); + // sessionId -> userId private final Map sessionUserMap = new ConcurrentHashMap<>(); + // sessionId -> callId private final Map sessionCallMap = new ConcurrentHashMap<>(); + // callId -> createTime private final Map callCreateTime = new ConcurrentHashMap<>(); + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + logger.info("[CallSignaling] 连接建立: sessionId={}", session.getId()); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + try { + JsonNode json = objectMapper.readTree(message.getPayload()); + String type = json.has("type") ? json.get("type").asText() : ""; + + switch (type) { + case "register": + handleRegister(session, json); + break; + case "call_request": + handleCallRequest(session, json); + break; + case "call_accept": + handleCallAccept(session, json); + break; + case "call_reject": + handleCallReject(session, json); + break; + case "call_cancel": + handleCallCancel(session, json); + break; + case "call_end": + handleCallEnd(session, json); + break; + case "offer": + case "answer": + case "ice-candidate": + handleSignaling(session, json, type); + break; + default: + logger.warn("[CallSignaling] 未知消息类型: {}", type); + } + } catch (Exception e) { + logger.error("[CallSignaling] 处理消息异常", e); + sendError(session, "消息处理失败: " + e.getMessage()); + } + } + + private void handleRegister(WebSocketSession session, JsonNode json) throws IOException { + Integer userId = json.has("userId") ? json.get("userId").asInt() : null; + if (userId == null) { + sendError(session, "userId不能为空"); + return; + } + + // 关闭旧连接 + WebSocketSession oldSession = userCallSessions.get(userId); + if (oldSession != null && oldSession.isOpen() && !oldSession.getId().equals(session.getId())) { + try { + oldSession.close(); + } catch (Exception ignored) {} + } + + userCallSessions.put(userId, session); + sessionUserMap.put(session.getId(), userId); + + ObjectNode response = objectMapper.createObjectNode(); + response.put("type", "registered"); + response.put("userId", userId); + session.sendMessage(new TextMessage(response.toString())); + logger.info("[CallSignaling] 用户注册: userId={}", userId); + } + + private void handleCallRequest(WebSocketSession session, JsonNode json) throws IOException { + Integer callerId = sessionUserMap.get(session.getId()); + if (callerId == null) { + sendError(session, "请先注册"); + return; + } + + Integer calleeId = json.has("calleeId") ? json.get("calleeId").asInt() : null; + String callType = json.has("callType") ? json.get("callType").asText() : "voice"; + + if (calleeId == null) { + sendError(session, "calleeId不能为空"); + return; + } + + try { + // 创建通话记录 + CallRecord record = callService.createCall(callerId, calleeId, callType); + String callId = record.getCallId(); + + // 记录通话创建时间 + callCreateTime.put(callId, System.currentTimeMillis()); + + // 加入通话会话 + joinCallSession(callId, session); + sessionCallMap.put(session.getId(), callId); + + // 获取用户信息 + User caller = userService.getById(callerId); + + // 通知被叫方 + WebSocketSession calleeSession = userCallSessions.get(calleeId); + if (calleeSession != null && calleeSession.isOpen()) { + ObjectNode incoming = objectMapper.createObjectNode(); + incoming.put("type", "incoming_call"); + incoming.put("callId", callId); + incoming.put("callerId", callerId); + incoming.put("callerName", caller != null ? caller.getNickname() : "用户" + callerId); + incoming.put("callerAvatar", caller != null ? caller.getAvatar() : ""); + incoming.put("callType", callType); + calleeSession.sendMessage(new TextMessage(incoming.toString())); + } + + // 响应主叫方 + ObjectNode response = objectMapper.createObjectNode(); + response.put("type", "call_created"); + response.put("callId", callId); + session.sendMessage(new TextMessage(response.toString())); + + logger.info("[CallSignaling] 发起通话: callId={}, caller={}, callee={}", callId, callerId, calleeId); + } catch (Exception e) { + sendError(session, e.getMessage()); + } + } + + private void handleCallAccept(WebSocketSession session, JsonNode json) throws IOException { + Integer userId = sessionUserMap.get(session.getId()); + String callId = json.has("callId") ? json.get("callId").asText() : null; + + if (userId == null || callId == null) { + sendError(session, "参数不完整"); + return; + } + + try { + callService.acceptCall(callId, userId); + joinCallSession(callId, session); + sessionCallMap.put(session.getId(), callId); + + // 通知主叫方 + CallRecord record = callService.getByCallId(callId); + if (record != null) { + WebSocketSession callerSession = userCallSessions.get(record.getCallerId()); + if (callerSession != null && callerSession.isOpen()) { + ObjectNode notify = objectMapper.createObjectNode(); + notify.put("type", "call_accepted"); + notify.put("callId", callId); + callerSession.sendMessage(new TextMessage(notify.toString())); + } + } + + logger.info("[CallSignaling] 接听通话: callId={}, userId={}", callId, userId); + } catch (Exception e) { + sendError(session, e.getMessage()); + } + } + + private void handleCallReject(WebSocketSession session, JsonNode json) throws IOException { + Integer userId = sessionUserMap.get(session.getId()); + String callId = json.has("callId") ? json.get("callId").asText() : null; + + if (userId == null || callId == null) { + sendError(session, "参数不完整"); + return; + } + + try { + CallRecord record = callService.getByCallId(callId); + callService.rejectCall(callId, userId); + + // 通知主叫方 + if (record != null) { + WebSocketSession callerSession = userCallSessions.get(record.getCallerId()); + if (callerSession != null && callerSession.isOpen()) { + ObjectNode notify = objectMapper.createObjectNode(); + notify.put("type", "call_rejected"); + notify.put("callId", callId); + callerSession.sendMessage(new TextMessage(notify.toString())); + } + } + + cleanupCall(callId); + logger.info("[CallSignaling] 拒绝通话: callId={}, userId={}", callId, userId); + } catch (Exception e) { + sendError(session, e.getMessage()); + } + } + + private void handleCallCancel(WebSocketSession session, JsonNode json) throws IOException { + Integer userId = sessionUserMap.get(session.getId()); + String callId = json.has("callId") ? json.get("callId").asText() : null; + + if (userId == null || callId == null) { + sendError(session, "参数不完整"); + return; + } + + try { + CallRecord record = callService.getByCallId(callId); + callService.cancelCall(callId, userId); + + // 通知被叫方 + if (record != null) { + WebSocketSession calleeSession = userCallSessions.get(record.getCalleeId()); + if (calleeSession != null && calleeSession.isOpen()) { + ObjectNode notify = objectMapper.createObjectNode(); + notify.put("type", "call_cancelled"); + notify.put("callId", callId); + calleeSession.sendMessage(new TextMessage(notify.toString())); + } + } + + cleanupCall(callId); + logger.info("[CallSignaling] 取消通话: callId={}, userId={}", callId, userId); + } catch (Exception e) { + sendError(session, e.getMessage()); + } + } + + private void handleCallEnd(WebSocketSession session, JsonNode json) throws IOException { + Integer userId = sessionUserMap.get(session.getId()); + String callId = json.has("callId") ? json.get("callId").asText() : sessionCallMap.get(session.getId()); + String reason = json.has("reason") ? json.get("reason").asText() : "normal"; + + if (userId == null || callId == null) { + sendError(session, "参数不完整"); + return; + } + + try { + CallRecord record = callService.getByCallId(callId); + callService.endCall(callId, userId, reason); + + // 通知对方 + if (record != null) { + Integer otherUserId = record.getCallerId().equals(userId) ? record.getCalleeId() : record.getCallerId(); + WebSocketSession otherSession = userCallSessions.get(otherUserId); + if (otherSession != null && otherSession.isOpen()) { + ObjectNode notify = objectMapper.createObjectNode(); + notify.put("type", "call_ended"); + notify.put("callId", callId); + notify.put("reason", reason); + otherSession.sendMessage(new TextMessage(notify.toString())); + } + } + + cleanupCall(callId); + logger.info("[CallSignaling] 结束通话: callId={}, userId={}, reason={}", callId, userId, reason); + } catch (Exception e) { + sendError(session, e.getMessage()); + } + } + + + private void handleSignaling(WebSocketSession session, JsonNode json, String type) throws IOException { + String callId = json.has("callId") ? json.get("callId").asText() : sessionCallMap.get(session.getId()); + if (callId == null) { + sendError(session, "callId不能为空"); + return; + } + + Set sessions = callSessions.get(callId); + if (sessions == null) { + sendError(session, "通话不存在"); + return; + } + + // 转发信令给通话中的其他参与者 + ObjectNode forward = objectMapper.createObjectNode(); + forward.put("type", type); + forward.put("callId", callId); + if (json.has("sdp")) { + forward.set("sdp", json.get("sdp")); + } + if (json.has("candidate")) { + forward.set("candidate", json.get("candidate")); + } + + String forwardMsg = forward.toString(); + for (WebSocketSession s : sessions) { + if (s.isOpen() && !s.getId().equals(session.getId())) { + s.sendMessage(new TextMessage(forwardMsg)); + } + } + + logger.debug("[CallSignaling] 转发信令: type={}, callId={}", type, callId); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + Integer userId = sessionUserMap.remove(session.getId()); + String callId = sessionCallMap.remove(session.getId()); + + if (userId != null) { + userCallSessions.remove(userId); + } + + if (callId != null) { + Set sessions = callSessions.get(callId); + if (sessions != null) { + sessions.remove(session); + } + + // 如果用户在通话中断开,结束通话 + CallRecord record = callService.getByCallId(callId); + if (record != null && isActiveStatus(record.getStatus())) { + callService.endCall(callId, userId, "disconnect"); + + // 通知对方 + Integer otherUserId = record.getCallerId().equals(userId) ? record.getCalleeId() : record.getCallerId(); + WebSocketSession otherSession = userCallSessions.get(otherUserId); + if (otherSession != null && otherSession.isOpen()) { + ObjectNode notify = objectMapper.createObjectNode(); + notify.put("type", "call_ended"); + notify.put("callId", callId); + notify.put("reason", "peer_disconnect"); + otherSession.sendMessage(new TextMessage(notify.toString())); + } + } + } + + logger.info("[CallSignaling] 连接关闭: sessionId={}, userId={}", session.getId(), userId); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + logger.error("[CallSignaling] 传输错误: sessionId={}", session.getId(), exception); + session.close(CloseStatus.SERVER_ERROR); + } + + /** + * 定时检查超时的通话 + */ + @Scheduled(fixedRate = 10000) + public void checkCallTimeout() { + long now = System.currentTimeMillis(); + callCreateTime.forEach((callId, createTime) -> { + if (now - createTime > CALL_TIMEOUT) { + try { + CallRecord record = callService.getByCallId(callId); + if (record != null && ("calling".equals(record.getStatus()) || "ringing".equals(record.getStatus()))) { + callService.missedCall(callId); + + // 通知双方 + notifyCallTimeout(record); + cleanupCall(callId); + logger.info("[CallSignaling] 通话超时: callId={}", callId); + } + } catch (Exception e) { + logger.error("[CallSignaling] 处理超时异常: callId={}", callId, e); + } + } + }); + } + + private void notifyCallTimeout(CallRecord record) { + try { + ObjectNode notify = objectMapper.createObjectNode(); + notify.put("type", "call_timeout"); + notify.put("callId", record.getCallId()); + + WebSocketSession callerSession = userCallSessions.get(record.getCallerId()); + if (callerSession != null && callerSession.isOpen()) { + callerSession.sendMessage(new TextMessage(notify.toString())); + } + + WebSocketSession calleeSession = userCallSessions.get(record.getCalleeId()); + if (calleeSession != null && calleeSession.isOpen()) { + calleeSession.sendMessage(new TextMessage(notify.toString())); + } + } catch (Exception e) { + logger.error("[CallSignaling] 通知超时异常", e); + } + } + + private void joinCallSession(String callId, WebSocketSession session) { + callSessions.computeIfAbsent(callId, k -> new CopyOnWriteArraySet<>()).add(session); + } + + private void cleanupCall(String callId) { + callSessions.remove(callId); + callCreateTime.remove(callId); + sessionCallMap.values().removeIf(v -> v.equals(callId)); + } + + private boolean isActiveStatus(String status) { + return "calling".equals(status) || "ringing".equals(status) || "connected".equals(status); + } + + private void sendError(WebSocketSession session, String message) throws IOException { + ObjectNode error = objectMapper.createObjectNode(); + error.put("type", "error"); + error.put("message", message); + session.sendMessage(new TextMessage(error.toString())); + } +}