连接直播

This commit is contained in:
xiao12feng8 2025-12-23 15:38:35 +08:00
parent 7094cbccbc
commit 9160e6e1a9
35 changed files with 7115 additions and 163 deletions

View File

@ -88,6 +88,49 @@ export function roomListApi(params) {
})
}
// 创建直播房间
export function liveRoomCreateApi(data) {
return request({
url: '/admin/room/live/create',
method: 'post',
data
})
}
// 编辑直播房间
export function liveRoomUpdateApi(data) {
return request({
url: '/admin/room/live/update',
method: 'post',
data
})
}
// 删除直播房间
export function liveRoomDeleteApi(id) {
return request({
url: `/admin/room/live/delete/${id}`,
method: 'post'
})
}
// 切换直播状态
export function liveRoomToggleStatusApi(id) {
return request({
url: `/admin/room/live/toggle-status/${id}`,
method: 'post'
})
}
// 获取直播间弹幕记录
export function liveRoomChatHistoryApi(roomId, limit = 100) {
return request({
url: `/admin/room/live/chat/${roomId}`,
method: 'get',
params: { limit }
})
}
// 房间详情
export function roomDetailApi(id) {
return request({

View File

@ -4,6 +4,9 @@
<div class="padding-add">
<el-page-header @back="goBack" content="房间管理列表"></el-page-header>
<div class="mt20">
<div class="mb20">
<el-button type="primary" icon="el-icon-plus" @click="openCreate">新增直播间</el-button>
</div>
<!-- 搜索表单 -->
<el-form inline size="small" :model="searchForm" class="mb20 search-form-inline">
<el-form-item>
@ -50,6 +53,7 @@
</el-table-column>
<el-table-column label="操作" width="200" align="center" fixed="right">
<template slot-scope="scope">
<el-button :type="scope.row.isLive ? 'danger' : 'success'" size="small" @click="handleToggleStatus(scope.row)">{{ scope.row.isLive ? '停播' : '开播' }}</el-button>
<el-button type="warning" size="small" @click="handleDetail(scope.row)">详情</el-button>
</template>
</el-table-column>
@ -70,7 +74,7 @@
</el-card>
<!-- 详情弹窗 -->
<el-dialog title="详情" :visible.sync="detailVisible" width="700px">
<el-dialog title="详情" :visible.sync="detailVisible" width="800px">
<el-descriptions :column="3" border>
<el-descriptions-item label="房间ID">{{ detailData.id }}</el-descriptions-item>
<el-descriptions-item label="标题">{{ detailData.title }}</el-descriptions-item>
@ -82,6 +86,21 @@
<el-descriptions-item label="FLV">{{ detailData.streamUrls && detailData.streamUrls.flv }}</el-descriptions-item>
<el-descriptions-item label="HLS">{{ detailData.streamUrls && detailData.streamUrls.hls }}</el-descriptions-item>
</el-descriptions>
<!-- 弹幕记录 -->
<div class="mt20">
<div class="chat-header">
<span style="font-weight: bold;">💬 弹幕记录</span>
<el-button size="mini" @click="loadChatHistory">刷新</el-button>
</div>
<el-table :data="chatHistory" max-height="300" size="small" v-loading="chatLoading">
<el-table-column prop="nickname" label="昵称" width="120" />
<el-table-column prop="content" label="内容" min-width="200" />
<el-table-column prop="createTime" label="时间" width="160" />
</el-table>
<div v-if="chatHistory.length === 0 && !chatLoading" class="no-data">暂无弹幕记录</div>
</div>
<div slot="footer" class="dialog-footer">
<el-button @click="detailVisible = false">关闭</el-button>
</div>
@ -108,11 +127,26 @@
<el-button type="primary" @click="handleSaveEdit">保存</el-button>
</div>
</el-dialog>
<el-dialog title="新增直播间" :visible.sync="createVisible" width="520px">
<el-form :model="createForm" label-width="120px">
<el-form-item label="直播标题">
<el-input v-model="createForm.title" placeholder="请输入直播标题" />
</el-form-item>
<el-form-item label="主播名称">
<el-input v-model="createForm.streamerName" placeholder="请输入主播名称" />
</el-form-item>
</el-form>
<div slot="footer" class="dialog-footer">
<el-button @click="createVisible = false">取消</el-button>
<el-button type="primary" :loading="createLoading" @click="handleCreate">创建</el-button>
</div>
</el-dialog>
</div>
</template>
<script>
import { roomListApi } from '@/api/room';
import { roomListApi, liveRoomCreateApi, liveRoomToggleStatusApi, liveRoomChatHistoryApi } from '@/api/room';
export default {
name: 'RoomList',
@ -132,17 +166,52 @@ export default {
loading: false,
detailVisible: false,
editVisible: false,
createVisible: false,
createLoading: false,
detailData: {},
chatHistory: [],
chatLoading: false,
editForm: {
status: '正常',
recommendStatus: '未推荐',
},
createForm: {
title: '',
streamerName: '',
},
};
},
mounted() {
this.getList();
},
methods: {
openCreate() {
this.createForm = {
title: '',
streamerName: '',
};
this.createVisible = true;
},
async handleCreate() {
if (!this.createForm.title || !this.createForm.streamerName) {
this.$message.error('请填写直播标题和主播名称');
return;
}
this.createLoading = true;
try {
const res = await liveRoomCreateApi({
title: this.createForm.title,
streamerName: this.createForm.streamerName,
});
this.createVisible = false;
await this.getList();
this.handleDetail(res);
} catch (error) {
this.$message.error(error.message || '创建失败');
} finally {
this.createLoading = false;
}
},
async getList() {
this.loading = true;
try {
@ -152,8 +221,6 @@ export default {
title: this.searchForm.title || undefined,
streamerName: this.searchForm.streamerName || undefined,
streamKey: this.searchForm.streamKey || undefined,
startTime: this.searchForm.startTime || undefined,
endTime: this.searchForm.endTime || undefined,
};
const res = await roomListApi(params);
this.tableData = (res && res.list) || [];
@ -168,9 +235,40 @@ export default {
this.searchForm.page = 1;
this.getList();
},
async handleToggleStatus(row) {
try {
const action = row.isLive ? '停播' : '开播';
await this.$confirm(`确定要${action}该直播间吗?`, '提示', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning'
});
const res = await liveRoomToggleStatusApi(row.id);
this.$message.success(res || `${action}成功`);
await this.getList();
} catch (error) {
if (error !== 'cancel') {
this.$message.error(error.message || '操作失败');
}
}
},
async handleDetail(row) {
this.detailData = row || {};
this.chatHistory = [];
this.detailVisible = true;
this.loadChatHistory();
},
async loadChatHistory() {
if (!this.detailData.id) return;
this.chatLoading = true;
try {
const res = await liveRoomChatHistoryApi(this.detailData.id, 100);
this.chatHistory = res || [];
} catch (error) {
console.error('加载弹幕记录失败', error);
} finally {
this.chatLoading = false;
}
},
handleEdit(row) {
this.editForm = {
@ -220,4 +318,21 @@ export default {
margin-bottom: 0;
}
}
.mt20 {
margin-top: 20px;
}
.chat-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 10px;
}
.no-data {
text-align: center;
color: #999;
padding: 20px;
}
</style>

View File

@ -3,6 +3,7 @@ package com.zbkj.admin.controller;
import cn.hutool.core.util.StrUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.zbkj.common.model.live.LiveChat;
import com.zbkj.common.model.live.LiveRoom;
import com.zbkj.common.model.room.Room;
import com.zbkj.common.page.CommonPage;
@ -12,6 +13,7 @@ import com.zbkj.common.request.RoomUpdateRequest;
import com.zbkj.common.result.CommonResult;
import com.zbkj.service.service.RoomService;
import com.zbkj.service.service.LiveRoomService;
import com.zbkj.service.service.LiveChatService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.Data;
@ -23,6 +25,7 @@ import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@ -42,6 +45,9 @@ public class RoomController {
@Autowired
private LiveRoomService liveRoomService;
@Autowired
private LiveChatService liveChatService;
@Value("${LIVE_PUBLIC_SRS_HOST:}")
private String publicHost;
@ -100,6 +106,68 @@ public class RoomController {
return CommonResult.success(page);
}
@ApiOperation(value = "直播房间:创建")
@RequestMapping(value = "/live/create", method = RequestMethod.POST)
public CommonResult<LiveRoomAdminResponse> createLiveRoom(@RequestBody @Validated LiveRoomCreateRequest body,
HttpServletRequest request) {
Integer uid = body.getUid() == null ? 0 : body.getUid();
LiveRoom room = liveRoomService.createRoom(uid, body.getTitle(), body.getStreamerName());
String host = resolveHost(request);
int rtmpPort = parsePort(publicRtmpPort, 25002);
int httpPort = parsePort(publicHttpPort, 25003);
return CommonResult.success(toAdminResponse(room, host, rtmpPort, httpPort));
}
@ApiOperation(value = "直播房间:编辑")
@RequestMapping(value = "/live/update", method = RequestMethod.POST)
public CommonResult<LiveRoomAdminResponse> updateLiveRoom(@RequestBody @Validated LiveRoomUpdateRequest body,
HttpServletRequest request) {
LiveRoom room = liveRoomService.getById(body.getId());
if (room == null) return CommonResult.failed("房间不存在");
room.setTitle(body.getTitle());
room.setStreamerName(body.getStreamerName());
if (!liveRoomService.updateById(room)) {
return CommonResult.failed("保存失败");
}
String host = resolveHost(request);
int rtmpPort = parsePort(publicRtmpPort, 25002);
int httpPort = parsePort(publicHttpPort, 25003);
return CommonResult.success(toAdminResponse(room, host, rtmpPort, httpPort));
}
@ApiOperation(value = "直播房间:删除")
@RequestMapping(value = "/live/delete/{id}", method = RequestMethod.POST)
public CommonResult<String> deleteLiveRoom(@PathVariable Integer id) {
if (id == null) return CommonResult.failed("参数错误");
if (!liveRoomService.removeById(id)) {
return CommonResult.failed("删除失败");
}
return CommonResult.success();
}
@ApiOperation(value = "直播房间:切换直播状态")
@RequestMapping(value = "/live/toggle-status/{id}", method = RequestMethod.POST)
public CommonResult<String> toggleLiveStatus(@PathVariable Integer id) {
if (id == null) return CommonResult.failed("参数错误");
LiveRoom room = liveRoomService.getById(id);
if (room == null) return CommonResult.failed("房间不存在");
int newStatus = (room.getIsLive() != null && room.getIsLive() == 1) ? 0 : 1;
room.setIsLive(newStatus);
if (!liveRoomService.updateById(room)) {
return CommonResult.failed("更新失败");
}
return CommonResult.success(newStatus == 1 ? "已开播" : "已停播");
}
@ApiOperation(value = "直播房间:弹幕记录")
@RequestMapping(value = "/live/chat/{roomId}", method = RequestMethod.GET)
public CommonResult<List<LiveChat>> getLiveChatHistory(@PathVariable Integer roomId,
@RequestParam(defaultValue = "100") Integer limit) {
if (roomId == null) return CommonResult.failed("参数错误");
List<LiveChat> messages = liveChatService.getRoomMessages(roomId, limit);
return CommonResult.success(messages);
}
/**
* 获取房间详情
*/
@ -180,10 +248,33 @@ public class RoomController {
private String streamerName;
private String streamKey;
private Boolean isLive;
private java.util.Date createTime;
private Date createTime;
private LiveRoomStreamUrlsResponse streamUrls;
}
@Data
public static class LiveRoomCreateRequest {
private Integer uid;
@javax.validation.constraints.NotBlank
private String title;
@javax.validation.constraints.NotBlank
private String streamerName;
}
@Data
public static class LiveRoomUpdateRequest {
@javax.validation.constraints.NotNull
private Integer id;
@javax.validation.constraints.NotBlank
private String title;
@javax.validation.constraints.NotBlank
private String streamerName;
}
@Data
public static class LiveRoomStreamUrlsResponse {
private String rtmp;

View File

@ -0,0 +1,105 @@
package com.zbkj.admin.task.live;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zbkj.common.model.live.LiveRoom;
import com.zbkj.service.service.LiveRoomService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* 直播状态同步任务
* 定期查询 SRS API 获取当前推流列表自动更新直播状态
*/
@Component("LiveStatusSyncTask")
@EnableScheduling
public class LiveStatusSyncTask {
private static final Logger logger = LoggerFactory.getLogger(LiveStatusSyncTask.class);
@Autowired
private LiveRoomService liveRoomService;
@Value("${SRS_API_URL:http://127.0.0.1:1985}")
private String srsApiUrl;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 同步直播状态 5 秒执行一次
*/
@Scheduled(fixedRate = 5000)
public void syncLiveStatus() {
try {
Set<String> liveStreamKeys = fetchLiveStreamKeysFromSrs();
updateLiveStatus(liveStreamKeys);
} catch (Exception e) {
logger.error("LiveStatusSyncTask 执行失败: {}", e.getMessage());
}
}
/**
* SRS API 获取当前正在推流的 streamKey 列表
*/
private Set<String> fetchLiveStreamKeysFromSrs() {
Set<String> streamKeys = new HashSet<>();
try {
URL url = new URL(srsApiUrl + "/api/v1/streams/");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setConnectTimeout(3000);
conn.setReadTimeout(3000);
if (conn.getResponseCode() == 200) {
JsonNode root = objectMapper.readTree(conn.getInputStream());
JsonNode streams = root.get("streams");
if (streams != null && streams.isArray()) {
for (JsonNode stream : streams) {
JsonNode publish = stream.get("publish");
if (publish != null && publish.has("active") && publish.get("active").asBoolean()) {
String name = stream.get("name").asText();
if (name != null && !name.isEmpty()) {
streamKeys.add(name);
}
}
}
}
}
conn.disconnect();
} catch (Exception e) {
logger.warn("查询 SRS API 失败: {}", e.getMessage());
}
return streamKeys;
}
/**
* 更新数据库中的直播状态
*/
private void updateLiveStatus(Set<String> liveStreamKeys) {
List<LiveRoom> allRooms = liveRoomService.list(new LambdaQueryWrapper<>());
for (LiveRoom room : allRooms) {
String streamKey = room.getStreamKey();
boolean shouldBeLive = streamKey != null && liveStreamKeys.contains(streamKey);
int newStatus = shouldBeLive ? 1 : 0;
int currentStatus = room.getIsLive() == null ? 0 : room.getIsLive();
if (newStatus != currentStatus) {
room.setIsLive(newStatus);
liveRoomService.updateById(room);
logger.info("直播状态更新: {} -> {}", room.getTitle(), shouldBeLive ? "直播中" : "未开播");
}
}
}
}

View File

@ -0,0 +1,29 @@
package com.zbkj.common.model.live;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
/**
* 直播间聊天记录
*/
@Data
@TableName("eb_live_chat")
public class LiveChat {
@TableId(type = IdType.AUTO)
private Long id;
private Integer roomId;
private String visitorId;
private String nickname;
private String content;
private Date createTime;
}

View File

@ -21,6 +21,17 @@
<groupId>com.zbkj</groupId>
<artifactId>crmeb-service</artifactId>
<version>${crmeb-service}</version>
<exclusions>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- WebSocket 支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>

View File

@ -0,0 +1,25 @@
package com.zbkj.front.config;
import com.zbkj.front.websocket.LiveChatHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final LiveChatHandler liveChatHandler;
public WebSocketConfig(LiveChatHandler liveChatHandler) {
this.liveChatHandler = liveChatHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 直播间聊天 WebSocket 端点: ws://host:8081/ws/live/chat/{roomId}
registry.addHandler(liveChatHandler, "/ws/live/chat/{roomId}")
.setAllowedOrigins("*");
}
}

View File

@ -16,6 +16,8 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
@RestController
@ -38,20 +40,26 @@ public class LiveRoomController {
@Value("${LIVE_PUBLIC_SRS_HTTP_PORT:}")
private String publicHttpPort;
@ApiOperation(value = "公开:直播间列表")
@ApiOperation(value = "公开:直播间列表(只返回直播中的房间)")
@GetMapping("/public/rooms")
public CommonResult<List<LiveRoomResponse>> publicRooms(HttpServletRequest request) {
String requestHost = resolveHost(request);
// 只返回 is_live=1 的直播间
return CommonResult.success(liveRoomService.getAll().stream()
.filter(r -> r.getIsLive() != null && r.getIsLive() == 1)
.map(r -> toResponse(r, requestHost))
.collect(Collectors.toList()));
}
@ApiOperation(value = "公开:直播间详情")
@ApiOperation(value = "公开:直播间详情(只返回直播中的房间)")
@GetMapping("/public/rooms/{id}")
public CommonResult<LiveRoomResponse> publicRoom(@PathVariable Integer id, HttpServletRequest request) {
LiveRoom room = liveRoomService.getById(id);
if (room == null) return CommonResult.failed("房间不存在");
// 只返回直播中的房间
if (room.getIsLive() == null || room.getIsLive() != 1) {
return CommonResult.failed("直播已结束");
}
return CommonResult.success(toResponse(room, resolveHost(request)));
}
@ -129,4 +137,40 @@ public class LiveRoomController {
return def;
}
}
// ========== SRS 回调接口 ==========
@ApiOperation(value = "SRS回调推流开始")
@PostMapping("/srs/on_publish")
public Map<String, Object> onPublish(@RequestBody Map<String, Object> body) {
String stream = (String) body.get("stream");
System.out.println("[SRS] 推流开始: stream=" + stream);
if (stream != null && !stream.isEmpty()) {
liveRoomService.setLiveStatus(stream, true);
}
return Collections.singletonMap("code", 0);
}
@ApiOperation(value = "SRS回调推流结束")
@PostMapping("/srs/on_unpublish")
public Map<String, Object> onUnpublish(@RequestBody Map<String, Object> body) {
String stream = (String) body.get("stream");
System.out.println("[SRS] 推流结束: stream=" + stream);
if (stream != null && !stream.isEmpty()) {
liveRoomService.setLiveStatus(stream, false);
}
return Collections.singletonMap("code", 0);
}
@ApiOperation(value = "SRS回调观看开始")
@PostMapping("/srs/on_play")
public Map<String, Object> onPlay(@RequestBody Map<String, Object> body) {
return Collections.singletonMap("code", 0);
}
@ApiOperation(value = "SRS回调观看结束")
@PostMapping("/srs/on_stop")
public Map<String, Object> onStop(@RequestBody Map<String, Object> body) {
return Collections.singletonMap("code", 0);
}
}

View File

@ -1,86 +0,0 @@
package com.zbkj.front.controller;
import com.zbkj.service.service.LiveRoomService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.util.MultiValueMap;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("api/front/live/srs")
@Api(tags = "直播 -- SRS 回调")
public class SrsCallbackController {
@Autowired
private LiveRoomService liveRoomService;
@ApiOperation(value = "SRS 推流开始回调")
@PostMapping(value = "/on_publish", consumes = MediaType.APPLICATION_JSON_VALUE)
public Map<String, Object> onPublishJson(@RequestBody(required = false) Map<String, Object> body) {
String stream = body == null ? null : String.valueOf(body.get("stream"));
if (stream != null && !stream.trim().isEmpty()) {
liveRoomService.setLiveStatus(stream.trim(), true);
}
Map<String, Object> res = new HashMap<>();
res.put("code", 0);
return res;
}
@ApiOperation(value = "SRS 推流开始回调")
@PostMapping(value = "/on_publish", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
public Map<String, Object> onPublishForm(@RequestParam MultiValueMap<String, String> form) {
String stream = form == null ? null : form.getFirst("stream");
if (stream != null && !stream.trim().isEmpty()) {
liveRoomService.setLiveStatus(stream.trim(), true);
}
Map<String, Object> res = new HashMap<>();
res.put("code", 0);
return res;
}
@ApiOperation(value = "SRS 推流结束回调")
@PostMapping(value = "/on_unpublish", consumes = MediaType.APPLICATION_JSON_VALUE)
public Map<String, Object> onUnpublishJson(@RequestBody(required = false) Map<String, Object> body) {
String stream = body == null ? null : String.valueOf(body.get("stream"));
if (stream != null && !stream.trim().isEmpty()) {
liveRoomService.setLiveStatus(stream.trim(), false);
}
Map<String, Object> res = new HashMap<>();
res.put("code", 0);
return res;
}
@ApiOperation(value = "SRS 推流结束回调")
@PostMapping(value = "/on_unpublish", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
public Map<String, Object> onUnpublishForm(@RequestParam MultiValueMap<String, String> form) {
String stream = form == null ? null : form.getFirst("stream");
if (stream != null && !stream.trim().isEmpty()) {
liveRoomService.setLiveStatus(stream.trim(), false);
}
Map<String, Object> res = new HashMap<>();
res.put("code", 0);
return res;
}
@ApiOperation(value = "SRS 观看回调")
@PostMapping("/on_play")
public Map<String, Object> onPlay(@RequestBody(required = false) Map<String, Object> body) {
Map<String, Object> res = new HashMap<>();
res.put("code", 0);
return res;
}
@ApiOperation(value = "SRS 停止观看回调")
@PostMapping("/on_stop")
public Map<String, Object> onStop(@RequestBody(required = false) Map<String, Object> body) {
Map<String, Object> res = new HashMap<>();
res.put("code", 0);
return res;
}
}

View File

@ -0,0 +1,170 @@
package com.zbkj.front.websocket;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.zbkj.service.service.LiveChatService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* 直播间聊天 WebSocket 处理器
*/
@Component
public class LiveChatHandler extends TextWebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(LiveChatHandler.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private LiveChatService liveChatService;
// roomId -> Set<WebSocketSession>
private final Map<String, Set<WebSocketSession>> roomSessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String roomId = extractRoomId(session);
if (roomId == null) {
session.close(CloseStatus.BAD_DATA);
return;
}
roomSessions.computeIfAbsent(roomId, k -> new CopyOnWriteArraySet<>()).add(session);
logger.info("[LiveChat] 用户加入房间: roomId={}, sessionId={}, 当前人数={}",
roomId, session.getId(), roomSessions.get(roomId).size());
// 发送欢迎消息
sendToSession(session, buildSystemMessage("connected", "欢迎进入直播间"));
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String roomId = extractRoomId(session);
if (roomId != null) {
Set<WebSocketSession> sessions = roomSessions.get(roomId);
if (sessions != null) {
sessions.remove(session);
if (sessions.isEmpty()) {
roomSessions.remove(roomId);
}
}
logger.info("[LiveChat] 用户离开房间: roomId={}, sessionId={}", roomId, session.getId());
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String roomId = extractRoomId(session);
if (roomId == null) return;
try {
JsonNode json = objectMapper.readTree(message.getPayload());
String type = json.has("type") ? json.get("type").asText() : "chat";
if ("chat".equals(type)) {
String content = json.has("content") ? json.get("content").asText() : "";
String nickname = json.has("nickname") ? json.get("nickname").asText() : "匿名用户";
String userId = json.has("userId") ? json.get("userId").asText() : "";
if (content.isEmpty()) return;
// 保存消息到数据库
try {
Integer roomIdInt = Integer.parseInt(roomId);
liveChatService.saveMessage(roomIdInt, userId, nickname, content);
} catch (Exception e) {
logger.warn("[LiveChat] 保存消息失败: {}", e.getMessage());
}
// 构建聊天消息并广播给房间内所有人
String chatMsg = buildChatMessage(userId, nickname, content);
broadcastToRoom(roomId, chatMsg);
logger.debug("[LiveChat] 消息广播: roomId={}, nickname={}, content={}", roomId, nickname, content);
}
} catch (Exception e) {
logger.warn("[LiveChat] 消息解析失败: {}", e.getMessage());
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
logger.error("[LiveChat] 传输错误: sessionId={}, error={}", session.getId(), exception.getMessage());
session.close(CloseStatus.SERVER_ERROR);
}
private String extractRoomId(WebSocketSession session) {
String path = session.getUri() != null ? session.getUri().getPath() : "";
// 路径格式: /ws/live/chat/{roomId}
String[] parts = path.split("/");
if (parts.length >= 5) {
return parts[4];
}
return null;
}
private void broadcastToRoom(String roomId, String message) {
Set<WebSocketSession> sessions = roomSessions.get(roomId);
if (sessions == null || sessions.isEmpty()) return;
TextMessage textMessage = new TextMessage(message);
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(textMessage);
} catch (IOException e) {
logger.warn("[LiveChat] 发送消息失败: sessionId={}", session.getId());
}
}
}
}
private void sendToSession(WebSocketSession session, String message) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
logger.warn("[LiveChat] 发送消息失败: sessionId={}", session.getId());
}
}
}
private String buildChatMessage(String userId, String nickname, String content) {
ObjectNode node = objectMapper.createObjectNode();
node.put("type", "chat");
node.put("userId", userId);
node.put("nickname", nickname);
node.put("content", content);
node.put("timestamp", System.currentTimeMillis());
return node.toString();
}
private String buildSystemMessage(String type, String content) {
ObjectNode node = objectMapper.createObjectNode();
node.put("type", type);
node.put("content", content);
node.put("timestamp", System.currentTimeMillis());
return node.toString();
}
/**
* 获取房间在线人数
*/
public int getRoomOnlineCount(String roomId) {
Set<WebSocketSession> sessions = roomSessions.get(roomId);
return sessions != null ? sessions.size() : 0;
}
}

View File

@ -0,0 +1,10 @@
package com.zbkj.service.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.zbkj.common.model.live.LiveChat;
/**
* 直播间聊天记录 Mapper
*/
public interface LiveChatDao extends BaseMapper<LiveChat> {
}

View File

@ -0,0 +1,22 @@
package com.zbkj.service.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.zbkj.common.model.live.LiveChat;
import java.util.List;
/**
* 直播间聊天记录 Service
*/
public interface LiveChatService extends IService<LiveChat> {
/**
* 保存聊天消息
*/
void saveMessage(Integer roomId, String visitorId, String nickname, String content);
/**
* 获取房间聊天记录
*/
List<LiveChat> getRoomMessages(Integer roomId, int limit);
}

View File

@ -0,0 +1,38 @@
package com.zbkj.service.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zbkj.common.model.live.LiveChat;
import com.zbkj.service.dao.LiveChatDao;
import com.zbkj.service.service.LiveChatService;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
/**
* 直播间聊天记录 Service 实现
*/
@Service
public class LiveChatServiceImpl extends ServiceImpl<LiveChatDao, LiveChat> implements LiveChatService {
@Override
public void saveMessage(Integer roomId, String visitorId, String nickname, String content) {
LiveChat chat = new LiveChat();
chat.setRoomId(roomId);
chat.setVisitorId(visitorId != null ? visitorId : "");
chat.setNickname(nickname != null ? nickname : "匿名");
chat.setContent(content);
chat.setCreateTime(new Date());
save(chat);
}
@Override
public List<LiveChat> getRoomMessages(Integer roomId, int limit) {
LambdaQueryWrapper<LiveChat> qw = new LambdaQueryWrapper<>();
qw.eq(LiveChat::getRoomId, roomId);
qw.orderByDesc(LiveChat::getId);
qw.last("LIMIT " + limit);
return list(qw);
}
}

View File

@ -0,0 +1,75 @@
package com.example.livestreaming;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.TextView;
import androidx.annotation.NonNull;
import androidx.recyclerview.widget.RecyclerView;
import com.example.livestreaming.net.LiveChatClient;
import java.util.ArrayList;
import java.util.List;
public class LiveChatAdapter extends RecyclerView.Adapter<LiveChatAdapter.ChatVH> {
private final List<LiveChatClient.ChatMessage> messages = new ArrayList<>();
public void addMessage(LiveChatClient.ChatMessage message) {
messages.add(message);
notifyItemInserted(messages.size() - 1);
}
public void clear() {
messages.clear();
notifyDataSetChanged();
}
public int getMessageCount() {
return messages.size();
}
@NonNull
@Override
public ChatVH onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
View view = LayoutInflater.from(parent.getContext())
.inflate(android.R.layout.simple_list_item_2, parent, false);
return new ChatVH(view);
}
@Override
public void onBindViewHolder(@NonNull ChatVH holder, int position) {
LiveChatClient.ChatMessage msg = messages.get(position);
holder.bind(msg);
}
@Override
public int getItemCount() {
return messages.size();
}
static class ChatVH extends RecyclerView.ViewHolder {
private final TextView text1;
private final TextView text2;
ChatVH(View itemView) {
super(itemView);
text1 = itemView.findViewById(android.R.id.text1);
text2 = itemView.findViewById(android.R.id.text2);
}
void bind(LiveChatClient.ChatMessage msg) {
if ("connected".equals(msg.type) || "system".equals(msg.type)) {
text1.setText("系统");
text1.setTextColor(0xFF999999);
text2.setText(msg.content);
} else {
text1.setText(msg.nickname);
text1.setTextColor(0xFF1E88E5);
text2.setText(msg.content);
}
}
}
}

View File

@ -214,10 +214,9 @@ public class MainActivity extends AppCompatActivity {
binding.roomsRecyclerView.setLayoutManager(glm);
binding.roomsRecyclerView.setAdapter(adapter);
// 立即显示演示数据提升用户体验
// 直接获取服务器数据不显示演示数据
allRooms.clear();
allRooms.addAll(buildDemoRooms(12));
applyCategoryFilter(currentCategory);
adapter.submitList(new ArrayList<>());
}
private void setupUI() {
@ -316,8 +315,8 @@ public class MainActivity extends AppCompatActivity {
}
});
// 设置添加直播按钮点击事件
binding.fabAddLive.setOnClickListener(v -> showCreateRoomDialog());
// 隐藏创建直播按钮只有管理员可以在后台创建直播
binding.fabAddLive.setVisibility(android.view.View.GONE);
// 设置搜索框文本监听和图标切换
setupSearchBox();
@ -887,11 +886,10 @@ public class MainActivity extends AppCompatActivity {
List<Room> rooms = response.isSuccessful() && body != null && body.isOk() && body.getData() != null
? body.getData()
: Collections.emptyList();
if (rooms == null || rooms.isEmpty()) {
rooms = buildDemoRooms(12);
}
allRooms.clear();
if (rooms != null) {
allRooms.addAll(rooms);
}
applyCategoryFilter(currentCategory);
adapter.bumpCoverOffset();
}
@ -901,10 +899,8 @@ public class MainActivity extends AppCompatActivity {
binding.loading.setVisibility(View.GONE);
binding.swipeRefresh.setRefreshing(false);
isFetching = false;
allRooms.clear();
allRooms.addAll(buildDemoRooms(12));
// 网络失败时不显示演示数据保持当前列表
applyCategoryFilter(currentCategory);
adapter.bumpCoverOffset();
}
});
}

View File

@ -26,6 +26,7 @@ import androidx.recyclerview.widget.LinearLayoutManager;
import com.example.livestreaming.databinding.ActivityRoomDetailNewBinding;
import com.example.livestreaming.net.ApiClient;
import com.example.livestreaming.net.ApiResponse;
import com.example.livestreaming.net.LiveChatClient;
import com.example.livestreaming.net.Room;
import com.example.livestreaming.net.StreamConfig;
@ -47,7 +48,6 @@ public class RoomDetailActivity extends AppCompatActivity {
private ActivityRoomDetailNewBinding binding;
private final Handler handler = new Handler(Looper.getMainLooper());
private Runnable pollRunnable;
private Runnable chatSimulationRunnable;
private String roomId;
private Room room;
@ -67,18 +67,9 @@ public class RoomDetailActivity extends AppCompatActivity {
private List<ChatMessage> chatMessages = new ArrayList<>();
private Random random = new Random();
// 模拟用户名列表
private final String[] simulatedUsers = {
"游戏达人", "直播观众", "路过的小伙伴", "老铁666", "主播加油",
"夜猫子", "学生党", "上班族", "游戏爱好者", "直播粉丝"
};
// 模拟弹幕内容
private final String[] simulatedMessages = {
"主播666", "这个操作厉害了", "学到了学到了", "主播加油!",
"太强了", "这波操作可以", "牛牛牛", "厉害厉害", "支持主播",
"精彩精彩", "继续继续", "好看好看", "赞赞赞", "棒棒棒"
};
// WebSocket 聊天客户端
private LiveChatClient chatClient;
private String myNickname = "游客" + new Random().nextInt(10000);
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
@ -157,6 +148,46 @@ public class RoomDetailActivity extends AppCompatActivity {
}
return false;
});
// 初始化 WebSocket 聊天客户端
chatClient = new LiveChatClient();
}
private void connectChat() {
if (chatClient == null || TextUtils.isEmpty(roomId)) return;
String baseUrl = ApiClient.getCurrentBaseUrl(getApplicationContext());
chatClient.connect(baseUrl, roomId, new LiveChatClient.ChatListener() {
@Override
public void onConnected() {
addChatMessage(new ChatMessage("已连接直播间聊天", true));
}
@Override
public void onMessage(LiveChatClient.ChatMessage msg) {
if ("chat".equals(msg.type)) {
addChatMessage(new ChatMessage(msg.nickname, msg.content));
} else if ("connected".equals(msg.type)) {
// 忽略服务器的 connected 消息
}
}
@Override
public void onDisconnected() {
addChatMessage(new ChatMessage("聊天已断开连接", true));
}
@Override
public void onError(String error) {
addChatMessage(new ChatMessage("聊天连接失败", true));
}
});
}
private void disconnectChat() {
if (chatClient != null) {
chatClient.disconnect();
}
}
private void sendMessage() {
@ -164,17 +195,14 @@ public class RoomDetailActivity extends AppCompatActivity {
binding.chatInput.getText().toString().trim() : "";
if (!TextUtils.isEmpty(message)) {
addChatMessage(new ChatMessage("", message));
binding.chatInput.setText("");
// 模拟其他用户回复
handler.postDelayed(() -> {
if (random.nextFloat() < 0.3f) { // 30%概率有人回复
String user = simulatedUsers[random.nextInt(simulatedUsers.length)];
String reply = simulatedMessages[random.nextInt(simulatedMessages.length)];
addChatMessage(new ChatMessage(user, reply));
// 通过 WebSocket 发送消息
if (chatClient != null && chatClient.isConnected()) {
chatClient.sendMessage("", myNickname, message);
} else {
// WebSocket 未连接时显示本地消息
addChatMessage(new ChatMessage(myNickname, message));
}
}, 1000 + random.nextInt(3000));
binding.chatInput.setText("");
}
}
@ -219,14 +247,14 @@ public class RoomDetailActivity extends AppCompatActivity {
protected void onStart() {
super.onStart();
startPolling();
startChatSimulation();
connectChat();
}
@Override
protected void onStop() {
super.onStop();
stopPolling();
stopChatSimulation();
disconnectChat();
releasePlayer();
}
@ -271,33 +299,6 @@ public class RoomDetailActivity extends AppCompatActivity {
}
}
private void startChatSimulation() {
stopChatSimulation();
chatSimulationRunnable = () -> {
if (isFinishing() || isDestroyed()) return;
// 随机生成弹幕降低概率
if (random.nextFloat() < 0.25f) { // 25%概率生成弹幕
String user = simulatedUsers[random.nextInt(simulatedUsers.length)];
String message = simulatedMessages[random.nextInt(simulatedMessages.length)];
addChatMessage(new ChatMessage(user, message));
}
// 随机间隔5-12秒减少频率
int delay = 5000 + random.nextInt(7000);
handler.postDelayed(chatSimulationRunnable, delay);
};
// 首次延迟3秒开始
handler.postDelayed(chatSimulationRunnable, 3000);
}
private void stopChatSimulation() {
if (chatSimulationRunnable != null) {
handler.removeCallbacks(chatSimulationRunnable);
chatSimulationRunnable = null;
}
}
private boolean isFirstLoad = true;

View File

@ -118,10 +118,14 @@ public class RoomsAdapter extends ListAdapter<Room, RoomsAdapter.RoomVH> {
if (assetFile != null) {
model = "file:///android_asset/img/" + assetFile;
} else {
// 使用稳定快速的占位图片基于房间ID生成不同图片
String seed = room != null && room.getId() != null ? room.getId() : String.valueOf(getBindingAdapterPosition());
int h = Math.abs(seed.hashCode());
int imageId = (h % 1000) + 1;
model = "https://picsum.photos/id/" + imageId + "/600/450";
int imgIndex = (h % 10) + 1;
// 使用 placehold.co 快速加载国内访问快
String[] colors = {"ff6b6b", "4ecdc4", "45b7d1", "96ceb4", "ffeaa7", "dfe6e9", "fd79a8", "a29bfe", "00b894", "e17055"};
String color = colors[imgIndex - 1];
model = "https://placehold.co/600x450/" + color + "/ffffff?text=LIVE";
}
Glide.with(binding.coverImage)

View File

@ -12,6 +12,7 @@ import androidx.recyclerview.widget.DiffUtil;
import androidx.recyclerview.widget.ListAdapter;
import androidx.recyclerview.widget.RecyclerView;
import com.bumptech.glide.Glide;
import com.example.livestreaming.net.Room;
public class WaterfallRoomsAdapter extends ListAdapter<Room, WaterfallRoomsAdapter.RoomVH> {
@ -75,6 +76,19 @@ public class WaterfallRoomsAdapter extends ListAdapter<Room, WaterfallRoomsAdapt
// 设置主播名称
streamerName.setText(room.getStreamerName() != null ? room.getStreamerName() : "");
// 加载封面图片
String seed = room.getId() != null ? room.getId() : String.valueOf(getBindingAdapterPosition());
int h = Math.abs(seed.hashCode());
int imgIndex = (h % 10);
String[] colors = {"ff6b6b", "4ecdc4", "45b7d1", "96ceb4", "ffeaa7", "dfe6e9", "fd79a8", "a29bfe", "00b894", "e17055"};
String color = colors[imgIndex];
String imageUrl = "https://placehold.co/600x450/" + color + "/ffffff?text=LIVE";
Glide.with(coverImage)
.load(imageUrl)
.placeholder(R.drawable.bg_cover_placeholder)
.centerCrop()
.into(coverImage);
// 设置直播状态
if (room.isLive()) {
liveBadge.setVisibility(View.VISIBLE);

View File

@ -13,15 +13,15 @@ public interface ApiService {
@POST("api/front/login")
Call<ApiResponse<LoginResponse>> login(@Body LoginRequest body);
@GET("api/rooms")
@GET("api/front/live/public/rooms")
Call<ApiResponse<List<Room>>> getRooms();
@POST("api/rooms")
@POST("api/front/live/rooms")
Call<ApiResponse<Room>> createRoom(@Body CreateRoomRequest body);
@GET("api/rooms/{id}")
@GET("api/front/live/public/rooms/{id}")
Call<ApiResponse<Room>> getRoom(@Path("id") String id);
@DELETE("api/rooms/{id}")
@DELETE("api/front/live/rooms/{id}")
Call<ApiResponse<Object>> deleteRoom(@Path("id") String id);
}

View File

@ -0,0 +1,177 @@
package com.example.livestreaming.net;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
/**
* 直播间聊天 WebSocket 客户端
*/
public class LiveChatClient {
private static final String TAG = "LiveChatClient";
public interface ChatListener {
void onConnected();
void onMessage(ChatMessage message);
void onDisconnected();
void onError(String error);
}
public static class ChatMessage {
public String type;
public String userId;
public String nickname;
public String content;
public long timestamp;
public ChatMessage(String type, String userId, String nickname, String content, long timestamp) {
this.type = type;
this.userId = userId;
this.nickname = nickname;
this.content = content;
this.timestamp = timestamp;
}
}
private final OkHttpClient client;
private final Handler mainHandler;
private WebSocket webSocket;
private ChatListener listener;
private String roomId;
private boolean isConnected;
public LiveChatClient() {
this.client = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(10, TimeUnit.SECONDS)
.pingInterval(30, TimeUnit.SECONDS)
.build();
this.mainHandler = new Handler(Looper.getMainLooper());
}
public void connect(String baseUrl, String roomId, ChatListener listener) {
this.roomId = roomId;
this.listener = listener;
// 构建 WebSocket URL: ws://host:port/ws/live/chat/{roomId}
String wsUrl = baseUrl.replace("http://", "ws://")
.replace("https://", "wss://");
if (wsUrl.endsWith("/")) {
wsUrl = wsUrl.substring(0, wsUrl.length() - 1);
}
wsUrl = wsUrl + "/ws/live/chat/" + roomId;
Log.d(TAG, "Connecting to: " + wsUrl);
Request request = new Request.Builder()
.url(wsUrl)
.build();
webSocket = client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(@NonNull WebSocket ws, @NonNull Response response) {
Log.d(TAG, "WebSocket connected");
isConnected = true;
mainHandler.post(() -> {
if (listener != null) listener.onConnected();
});
}
@Override
public void onMessage(@NonNull WebSocket ws, @NonNull String text) {
Log.d(TAG, "Message received: " + text);
try {
JSONObject json = new JSONObject(text);
String type = json.optString("type", "chat");
String userId = json.optString("userId", "");
String nickname = json.optString("nickname", "");
String content = json.optString("content", "");
long timestamp = json.optLong("timestamp", System.currentTimeMillis());
ChatMessage msg = new ChatMessage(type, userId, nickname, content, timestamp);
mainHandler.post(() -> {
if (listener != null) listener.onMessage(msg);
});
} catch (JSONException e) {
Log.w(TAG, "Parse message failed: " + e.getMessage());
}
}
@Override
public void onClosing(@NonNull WebSocket ws, int code, @NonNull String reason) {
Log.d(TAG, "WebSocket closing: " + reason);
ws.close(1000, null);
}
@Override
public void onClosed(@NonNull WebSocket ws, int code, @NonNull String reason) {
Log.d(TAG, "WebSocket closed: " + reason);
isConnected = false;
mainHandler.post(() -> {
if (listener != null) listener.onDisconnected();
});
}
@Override
public void onFailure(@NonNull WebSocket ws, @NonNull Throwable t, @Nullable Response response) {
Log.e(TAG, "WebSocket error: " + t.getMessage());
isConnected = false;
mainHandler.post(() -> {
if (listener != null) listener.onError(t.getMessage());
});
}
});
}
public void sendMessage(String userId, String nickname, String content) {
if (webSocket == null || !isConnected) {
Log.w(TAG, "WebSocket not connected");
return;
}
try {
JSONObject json = new JSONObject();
json.put("type", "chat");
json.put("userId", userId);
json.put("nickname", nickname);
json.put("content", content);
webSocket.send(json.toString());
Log.d(TAG, "Message sent: " + content);
} catch (JSONException e) {
Log.e(TAG, "Build message failed: " + e.getMessage());
}
}
public void disconnect() {
if (webSocket != null) {
webSocket.close(1000, "User disconnect");
webSocket = null;
}
isConnected = false;
}
public boolean isConnected() {
return isConnected;
}
public String getRoomId() {
return roomId;
}
}

View File

@ -201,10 +201,62 @@
android:layout_height="wrap_content"
android:layout_marginTop="18dp"
android:text="删除房间"
android:visibility="gone"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toBottomOf="@id/copyKeyButton" />
<!-- 聊天区域 -->
<TextView
android:id="@+id/chatTitle"
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_marginTop="18dp"
android:text="💬 直播间聊天"
android:textSize="16sp"
android:textStyle="bold"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toBottomOf="@id/copyKeyButton" />
<androidx.recyclerview.widget.RecyclerView
android:id="@+id/chatRecyclerView"
android:layout_width="0dp"
android:layout_height="200dp"
android:layout_marginTop="8dp"
android:background="#F5F5F5"
android:padding="8dp"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toBottomOf="@id/chatTitle" />
<LinearLayout
android:id="@+id/chatInputLayout"
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_marginTop="8dp"
android:orientation="horizontal"
app:layout_constraintEnd_toEndOf="parent"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toBottomOf="@id/chatRecyclerView">
<com.google.android.material.textfield.TextInputEditText
android:id="@+id/chatInput"
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_weight="1"
android:hint="发送弹幕..."
android:inputType="text"
android:maxLines="1" />
<com.google.android.material.button.MaterialButton
android:id="@+id/sendButton"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="8dp"
android:text="发送" />
</LinearLayout>
</androidx.constraintlayout.widget.ConstraintLayout>
</androidx.core.widget.NestedScrollView>

12
live-streaming/Dockerfile Normal file
View File

@ -0,0 +1,12 @@
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install --production
COPY server ./server
EXPOSE 3001
CMD ["node", "server/index.js"]

View File

@ -0,0 +1,35 @@
version: '3.8'
services:
srs:
image: ossrs/srs:5
container_name: srs-server
ports:
- "${SRS_RTMP_EXPOSE_PORT:-25002}:1935" # RTMP
- "${SRS_HTTP_EXPOSE_PORT:-25003}:8080" # HTTP-FLV/HLS
- "${SRS_API_EXPOSE_PORT:-1985}:1985" # HTTP API
volumes:
- ./docker/srs/srs.conf:/usr/local/srs/conf/srs.conf
restart: unless-stopped
api-server:
build:
context: .
dockerfile: Dockerfile
image: live-streaming-api-server:latest
container_name: api-server
ports:
- "${API_EXPOSE_PORT:-25001}:3001"
environment:
- NODE_ENV=production
- PORT=3001
- EMBEDDED_MEDIA_SERVER=0
- SRS_HOST=srs
- SRS_RTMP_PORT=1935
- SRS_HTTP_PORT=8080
- PUBLIC_SRS_HOST=${PUBLIC_SRS_HOST:-192.168.1.164}
- PUBLIC_SRS_RTMP_PORT=${PUBLIC_SRS_RTMP_PORT:-${SRS_RTMP_EXPOSE_PORT:-25002}}
- PUBLIC_SRS_HTTP_PORT=${PUBLIC_SRS_HTTP_PORT:-${SRS_HTTP_EXPOSE_PORT:-25003}}
depends_on:
- srs
restart: unless-stopped

View File

@ -0,0 +1,91 @@
listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;
# 全局优化配置
# 减少缓冲区大小,降低延迟
mr_enabled off;
# 启用快速启动
fast_cache 10;
# 优化TCP配置
tcp_nodelay on;
http_server {
enabled on;
listen 8080;
dir ./objs/nginx/html;
# 启用跨域支持
crossdomain on;
}
http_api {
enabled on;
listen 1985;
# 启用跨域支持
crossdomain on;
}
stats {
network 0;
}
vhost __defaultVhost__ {
# RTMP 配置 - 优化延迟
rtmp {
enabled on;
# 减少缓冲区大小,降低延迟
chunk_size 4096;
}
# HLS 配置 - 优化延迟
hls {
enabled on;
hls_path ./objs/nginx/html;
# 减少分片时长,降低延迟
hls_fragment 2;
hls_window 6;
# 启用低延迟模式
hls_dispose 30;
}
# HTTP-FLV 配置 - 低延迟播放
http_remux {
enabled on;
mount [vhost]/[app]/[stream].flv;
# 启用快速启动
fast_cache 10;
}
# 转码配置(可选)
transcode {
enabled off;
}
# 播放配置 - 优化延迟
play {
# 减少GOP缓存
gop_cache off;
# 启用时间校正
time_jitter full;
# 减少队列长度
queue_length 10;
}
# 发布配置 - 优化延迟
publish {
# 减少首帧等待时间
firstpkt_timeout 20000;
# 减少正常包超时
normal_timeout 5000;
}
# HTTP Hooks: 推流/播放事件回调到 Node APINode API 再转发到 Java 后端
http_hooks {
enabled on;
on_publish http://api-server:3001/api/srs/on_publish;
on_unpublish http://api-server:3001/api/srs/on_unpublish;
on_play http://api-server:3001/api/srs/on_play;
on_stop http://api-server:3001/api/srs/on_stop;
}
}

5242
live-streaming/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,27 @@
{
"name": "live-streaming-system",
"version": "1.0.0",
"description": "基于 SRS 的个人直播系统",
"main": "server/index.js",
"scripts": {
"start": "node server/index.js",
"dev": "nodemon server/index.js",
"test": "jest",
"test:watch": "jest --watch",
"docker:up": "docker-compose up -d",
"docker:down": "docker-compose down"
},
"dependencies": {
"cors": "^2.8.5",
"dotenv": "^16.3.1",
"express": "^4.18.2",
"node-media-server": "2.7.3",
"uuid": "^9.0.1"
},
"devDependencies": {
"fast-check": "^3.14.0",
"jest": "^29.7.0",
"nodemon": "^3.0.2",
"supertest": "^6.3.3"
}
}

View File

@ -0,0 +1 @@
# Server directory

View File

@ -0,0 +1,155 @@
require('dotenv').config();
const express = require('express');
const cors = require('cors');
const NodeMediaServer = require('node-media-server');
const childProcess = require('child_process');
const fs = require('fs');
const path = require('path');
const roomsRouter = require('./routes/rooms');
const srsRouter = require('./routes/srs');
const errorHandler = require('./middleware/errorHandler');
const roomStore = require('./store/roomStore');
const app = express();
const PORT = process.env.PORT || 3001;
const startMediaServer = () => {
const host = process.env.SRS_HOST || 'localhost';
const rtmpPort = Number(process.env.SRS_RTMP_PORT || 1935);
const httpPort = Number(process.env.SRS_HTTP_PORT || 8080);
const rawFfmpegPath = process.env.FFMPEG_PATH;
const embeddedEnabledRaw = process.env.EMBEDDED_MEDIA_SERVER;
const embeddedEnabled = embeddedEnabledRaw == null
? true
: !['0', 'false', 'off', 'no'].includes(String(embeddedEnabledRaw).toLowerCase());
if (!embeddedEnabled) {
console.log('[Media Server] Embedded NodeMediaServer disabled (EMBEDDED_MEDIA_SERVER=0).');
return;
}
let ffmpegPath = rawFfmpegPath;
if (!ffmpegPath) {
try {
if (process.platform === 'win32') {
const out = childProcess.execSync('where ffmpeg', { stdio: ['ignore', 'pipe', 'ignore'] });
const first = String(out || '').split(/\r?\n/).map((s) => s.trim()).filter(Boolean)[0];
if (first) ffmpegPath = first;
} else {
const out = childProcess.execSync('which ffmpeg', { stdio: ['ignore', 'pipe', 'ignore'] });
const first = String(out || '').split(/\r?\n/).map((s) => s.trim()).filter(Boolean)[0];
if (first) ffmpegPath = first;
}
} catch (_) {
ffmpegPath = null;
}
}
if (ffmpegPath) {
const p = String(ffmpegPath).trim().replace(/^"|"$/g, '');
ffmpegPath = p;
try {
if (fs.existsSync(ffmpegPath) && fs.statSync(ffmpegPath).isDirectory()) {
ffmpegPath = path.join(ffmpegPath, 'ffmpeg.exe');
}
} catch (_) {
ffmpegPath = p;
}
if (!fs.existsSync(ffmpegPath)) {
console.warn(`[Media Server] FFMPEG_PATH set but not found: ${ffmpegPath}`);
ffmpegPath = null;
}
}
try {
const nmsConfig = {
rtmp: {
port: rtmpPort,
chunk_size: 60000,
gop_cache: true,
ping: 30,
ping_timeout: 60
},
http: {
port: httpPort,
mediaroot: './media',
allow_origin: '*'
}
};
if (ffmpegPath) {
nmsConfig.trans = {
ffmpeg: ffmpegPath,
tasks: [
{
app: 'live',
hls: true,
hlsFlags: '[hls_time=2:hls_list_size=6:hls_flags=delete_segments]'
}
]
};
}
const nms = new NodeMediaServer(nmsConfig);
nms.on('prePublish', (id, streamPath) => {
const parts = String(streamPath || '').split('/').filter(Boolean);
const streamKey = parts[1];
if (streamKey) roomStore.setLiveStatus(streamKey, true);
});
nms.on('donePublish', (id, streamPath) => {
const parts = String(streamPath || '').split('/').filter(Boolean);
const streamKey = parts[1];
if (streamKey) roomStore.setLiveStatus(streamKey, false);
});
nms.run();
console.log(`Media Server RTMP: rtmp://${host}:${rtmpPort}/live (Stream Key = streamKey)`);
console.log(`Media Server HTTP-FLV: http://${host}:${httpPort}/live/{streamKey}.flv`);
if (ffmpegPath) {
console.log(`Media Server HLS: http://${host}:${httpPort}/live/{streamKey}/index.m3u8`);
console.log(`[Media Server] Using FFmpeg: ${ffmpegPath}`);
} else {
console.log('Media Server HLS disabled (set FFMPEG_PATH to enable HLS)');
}
} catch (e) {
console.error('[Media Server] Failed to start embedded media server:', e.message);
console.error('[Media Server] If ports 1935/8080 are in use, stop the occupying process or change SRS_RTMP_PORT/SRS_HTTP_PORT.');
}
};
startMediaServer();
// 中间件
app.use(cors({
origin: process.env.CLIENT_URL || 'http://localhost:3000',
credentials: true
}));
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
// 路由
app.use('/api/rooms', roomsRouter);
app.use('/api/srs', srsRouter);
// 健康检查
app.get('/health', (req, res) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() });
});
// 错误处理
app.use(errorHandler);
// 启动服务
app.listen(PORT, '0.0.0.0', () => {
console.log(`API 服务运行在 http://localhost:${PORT}`);
console.log(`API 服务也可通过 http://0.0.0.0:${PORT} 访问(用于 Android 模拟器)`);
console.log(`SRS RTMP: rtmp://${process.env.SRS_HOST || 'localhost'}:${process.env.SRS_RTMP_PORT || 1935}/live/{streamKey}`);
console.log(`SRS HTTP: http://${process.env.SRS_HOST || 'localhost'}:${process.env.SRS_HTTP_PORT || 8080}/live/{streamKey}.flv`);
});
module.exports = app;

View File

@ -0,0 +1,15 @@
// 统一错误处理中间件
const errorHandler = (err, req, res, next) => {
console.error('Error:', err.message);
console.error('Stack:', err.stack);
res.status(err.status || 500).json({
success: false,
error: {
code: err.code || 'INTERNAL_ERROR',
message: err.message || '服务器内部错误'
}
});
};
module.exports = errorHandler;

View File

@ -0,0 +1,28 @@
// 验证房间创建请求
const validateRoom = (req, res, next) => {
const { title, streamerName } = req.body;
// 验证标题
if (!title || typeof title !== 'string' || title.trim().length === 0) {
return res.status(400).json({
success: false,
error: { code: 'VALIDATION_ERROR', message: '标题不能为空' }
});
}
// 验证主播名称
if (!streamerName || typeof streamerName !== 'string' || streamerName.trim().length === 0) {
return res.status(400).json({
success: false,
error: { code: 'VALIDATION_ERROR', message: '主播名称不能为空' }
});
}
// 清理输入
req.body.title = title.trim();
req.body.streamerName = streamerName.trim();
next();
};
module.exports = { validateRoom };

View File

@ -0,0 +1,81 @@
const express = require('express');
const router = express.Router();
const roomStore = require('../store/roomStore');
const { validateRoom } = require('../middleware/validate');
const { generateStreamUrls } = require('../utils/streamUrl');
const { getActiveStreamKeys } = require('../utils/srsHttpApi');
const getRequestHost = (req) => {
const hostHeader = req && req.get ? req.get('host') : null;
const fromHeader = hostHeader ? String(hostHeader).split(':')[0] : null;
return req && req.hostname ? req.hostname : fromHeader;
};
// GET /api/rooms - 获取所有房间
router.get('/', async (req, res) => {
const rooms = roomStore.getAll();
const activeStreamKeys = await getActiveStreamKeys({ app: 'live' });
const requestHost = getRequestHost(req);
res.json({
success: true,
data: rooms.map(room => ({
...room,
isLive: activeStreamKeys.size ? activeStreamKeys.has(room.streamKey) : room.isLive,
streamUrls: generateStreamUrls(room.streamKey, requestHost)
}))
});
});
// POST /api/rooms - 创建房间
router.post('/', validateRoom, (req, res) => {
const { title, streamerName } = req.body;
const room = roomStore.create({ title, streamerName });
const requestHost = getRequestHost(req);
res.status(201).json({
success: true,
data: {
...room,
streamUrls: generateStreamUrls(room.streamKey, requestHost)
}
});
});
// GET /api/rooms/:id - 获取单个房间
router.get('/:id', async (req, res) => {
const room = roomStore.getById(req.params.id);
const requestHost = getRequestHost(req);
if (!room) {
return res.status(404).json({
success: false,
error: { code: 'ROOM_NOT_FOUND', message: '房间不存在' }
});
}
res.json({
success: true,
data: {
...room,
isLive: (await getActiveStreamKeys({ app: 'live' })).has(room.streamKey) || room.isLive,
streamUrls: generateStreamUrls(room.streamKey, requestHost)
}
});
});
// DELETE /api/rooms/:id - 删除房间
router.delete('/:id', (req, res) => {
const deleted = roomStore.delete(req.params.id);
if (!deleted) {
return res.status(404).json({
success: false,
error: { code: 'ROOM_NOT_FOUND', message: '房间不存在' }
});
}
res.json({ success: true });
});
module.exports = router;

View File

@ -0,0 +1,97 @@
const express = require('express');
const router = express.Router();
const roomStore = require('../store/roomStore');
const http = require('http');
// Java 后端地址 (host.docker.internal 用于从 Docker 容器访问宿主机)
const JAVA_BACKEND_HOST = process.env.JAVA_BACKEND_HOST || 'host.docker.internal';
const JAVA_BACKEND_PORT = process.env.JAVA_BACKEND_PORT || 8081;
// 转发到 Java 后端
function forwardToJava(path, data) {
const postData = JSON.stringify(data);
const options = {
hostname: JAVA_BACKEND_HOST,
port: JAVA_BACKEND_PORT,
path: path,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData)
},
timeout: 5000
};
const req = http.request(options, (res) => {
console.log(`[SRS->Java] ${path} 响应状态: ${res.statusCode}`);
});
req.on('error', (e) => {
console.error(`[SRS->Java] ${path} 转发失败: ${e.message}`);
});
req.on('timeout', () => {
console.error(`[SRS->Java] ${path} 转发超时`);
req.destroy();
});
req.write(postData);
req.end();
}
// POST /api/srs/on_publish - 推流开始回调
router.post('/on_publish', (req, res) => {
const { app, stream } = req.body;
console.log(`[SRS] 推流开始: app=${app}, stream=${stream}`);
// 更新内存存储
const room = roomStore.setLiveStatus(stream, true);
if (room) {
console.log(`[SRS] 房间 "${room.title}" 开始直播`);
} else {
console.log(`[SRS] 未找到对应房间streamKey=${stream}`);
}
// 转发到 Java 后端更新数据库
forwardToJava('/api/front/live/srs/on_publish', { stream });
// SRS 要求返回 code: 0 表示成功
res.json({ code: 0 });
});
// POST /api/srs/on_unpublish - 推流结束回调
router.post('/on_unpublish', (req, res) => {
const { app, stream } = req.body;
console.log(`[SRS] 推流结束: app=${app}, stream=${stream}`);
// 更新内存存储
const room = roomStore.setLiveStatus(stream, false);
if (room) {
console.log(`[SRS] 房间 "${room.title}" 停止直播`);
}
// 转发到 Java 后端更新数据库
forwardToJava('/api/front/live/srs/on_unpublish', { stream });
res.json({ code: 0 });
});
// POST /api/srs/on_play - 观看回调 (可选)
router.post('/on_play', (req, res) => {
const { app, stream } = req.body;
console.log(`[SRS] 观众进入: app=${app}, stream=${stream}`);
res.json({ code: 0 });
});
// POST /api/srs/on_stop - 停止观看回调 (可选)
router.post('/on_stop', (req, res) => {
const { app, stream } = req.body;
console.log(`[SRS] 观众离开: app=${app}, stream=${stream}`);
res.json({ code: 0 });
});
module.exports = router;

View File

@ -0,0 +1,116 @@
const { v4: uuidv4 } = require('uuid');
const fs = require('fs');
const path = require('path');
// 持久化文件路径
const STORAGE_FILE = path.join(__dirname, '../../data/rooms.json');
// 确保数据目录存在
const ensureDataDir = () => {
const dir = path.dirname(STORAGE_FILE);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
};
// 从文件加载房间数据
const loadRooms = () => {
try {
ensureDataDir();
if (fs.existsSync(STORAGE_FILE)) {
const data = fs.readFileSync(STORAGE_FILE, 'utf8');
const roomsArray = JSON.parse(data);
return new Map(roomsArray.map(room => [room.id, room]));
}
} catch (e) {
console.warn('[RoomStore] Failed to load rooms from file:', e.message);
}
return new Map();
};
// 保存房间数据到文件
const saveRooms = (rooms) => {
try {
ensureDataDir();
const roomsArray = Array.from(rooms.values());
fs.writeFileSync(STORAGE_FILE, JSON.stringify(roomsArray, null, 2), 'utf8');
} catch (e) {
console.error('[RoomStore] Failed to save rooms to file:', e.message);
}
};
// 内存存储(从文件加载)
const rooms = loadRooms();
console.log(`[RoomStore] Loaded ${rooms.size} rooms from storage`);
const roomStore = {
// 创建房间
create(data) {
const id = uuidv4();
const room = {
id,
title: data.title,
streamerName: data.streamerName,
streamKey: id,
isLive: false,
viewerCount: 0,
createdAt: new Date().toISOString(),
startedAt: null
};
rooms.set(id, room);
saveRooms(rooms);
return room;
},
// 获取所有房间
getAll() {
return Array.from(rooms.values());
},
// 根据 ID 获取房间
getById(id) {
return rooms.get(id) || null;
},
// 根据 streamKey 获取房间
getByStreamKey(streamKey) {
return rooms.get(streamKey) || null;
},
// 更新房间
update(id, data) {
const room = rooms.get(id);
if (!room) return null;
const updated = { ...room, ...data };
rooms.set(id, updated);
saveRooms(rooms);
return updated;
},
// 设置直播状态
setLiveStatus(streamKey, isLive) {
const room = rooms.get(streamKey);
if (!room) return null;
room.isLive = isLive;
room.startedAt = isLive ? new Date().toISOString() : null;
rooms.set(streamKey, room);
saveRooms(rooms);
return room;
},
// 删除房间
delete(id) {
const result = rooms.delete(id);
if (result) saveRooms(rooms);
return result;
},
// 清空所有房间 (测试用)
clear() {
rooms.clear();
}
};
module.exports = roomStore;

View File

@ -0,0 +1,90 @@
const http = require('http');
const https = require('https');
const requestJson = (url, { timeoutMs = 2000 } = {}) => {
return new Promise((resolve, reject) => {
const u = new URL(url);
const lib = u.protocol === 'https:' ? https : http;
const req = lib.request(
{
protocol: u.protocol,
hostname: u.hostname,
port: u.port,
path: `${u.pathname}${u.search}`,
method: 'GET'
},
(res) => {
let raw = '';
res.setEncoding('utf8');
res.on('data', (chunk) => (raw += chunk));
res.on('end', () => {
if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
return reject(new Error(`SRS HTTP API status ${res.statusCode}`));
}
try {
resolve(JSON.parse(raw || '{}'));
} catch (e) {
reject(new Error('Invalid JSON from SRS HTTP API'));
}
});
}
);
req.on('error', reject);
req.setTimeout(timeoutMs, () => {
req.destroy(new Error('SRS HTTP API request timeout'));
});
req.end();
});
};
const normalizeStreamName = (s) => {
if (!s) return null;
if (typeof s === 'string') return s;
return s.stream || s.name || s.id || null;
};
const isPublishActive = (streamObj) => {
const publish = streamObj && streamObj.publish;
if (!publish) return false;
if (typeof publish.active === 'boolean') return publish.active;
return Boolean(publish.cid);
};
let warnedOnce = false;
const getActiveStreamKeys = async ({ app = 'live' } = {}) => {
const host = process.env.SRS_HOST || 'localhost';
const apiPort = process.env.SRS_API_PORT || 1985;
const url = `http://${host}:${apiPort}/api/v1/streams/?count=100`;
try {
const payload = await requestJson(url, { timeoutMs: 1500 });
const streams = payload.streams || payload.data?.streams || [];
const active = new Set();
for (const s of streams) {
if (app && s.app && s.app !== app) continue;
if (!isPublishActive(s)) continue;
const name = normalizeStreamName(s);
if (name) active.add(name);
}
return active;
} catch (e) {
if (!warnedOnce) {
warnedOnce = true;
console.warn(`[SRS] HTTP API unavailable at ${url}`);
console.warn(`[SRS] Error: ${e.message}`);
console.warn(`[SRS] Will fallback to callbacks/in-memory status.`);
}
return new Set();
}
};
module.exports = {
getActiveStreamKeys
};

View File

@ -0,0 +1,26 @@
// 生成流地址
const generateStreamUrls = (streamKey, requestHost) => {
// 优先使用环境变量配置的公网地址
const host = process.env.PUBLIC_SRS_HOST || requestHost || process.env.SRS_HOST || 'localhost';
const rtmpPort = process.env.PUBLIC_SRS_RTMP_PORT || process.env.SRS_RTMP_PORT || 1935;
const httpPort = process.env.PUBLIC_SRS_HTTP_PORT || process.env.SRS_HTTP_PORT || 8080;
const ffmpegPath = process.env.FFMPEG_PATH;
const embeddedEnabledRaw = process.env.EMBEDDED_MEDIA_SERVER;
const embeddedEnabled = embeddedEnabledRaw == null
? true
: !['0', 'false', 'off', 'no'].includes(String(embeddedEnabledRaw).toLowerCase());
return {
// 推流地址 (给主播用) - 完整路径包含 streamKey
rtmp: `rtmp://${host}:${rtmpPort}/live/${streamKey}`,
// 播放地址 (给观众用)
flv: `http://${host}:${httpPort}/live/${streamKey}.flv`,
hls: embeddedEnabled
? (ffmpegPath ? `http://${host}:${httpPort}/live/${streamKey}/index.m3u8` : null)
: `http://${host}:${httpPort}/live/${streamKey}.m3u8`
};
};
module.exports = { generateStreamUrls };