smart-home/ziyuan/test_thermal_response.py

67 lines
2.0 KiB
Python
Raw Normal View History

2026-02-26 09:16:34 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
模拟ESP32发送16宫格热成像数据响应
用于测试APP端的UDP接收功能
"""
import socket
import json
import time
import threading
def create_mock_thermal_data():
"""创建模拟的16宫格热成像数据"""
import random
grid = []
for i in range(16):
# 生成25-45度的随机温度
temp = round(25 + random.random() * 20, 1)
grid.append(temp)
return {
"type": "thermal_grid",
"timestamp": int(time.time() * 1000),
"grid": grid
}
def udp_server():
"""UDP服务器监听APP请求并响应"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('0.0.0.0', 8899)) # 监听8899端口
print("模拟ESP32 UDP服务器启动监听端口8899...")
print("等待APP端连接...")
while True:
try:
# 接收请求
data, addr = server_socket.recvfrom(1024)
command = data.decode('utf-8').strip()
print(f"收到来自 {addr} 的命令: {command}")
if command == "GET_THERMAL_GRID":
# 创建模拟数据
thermal_data = create_mock_thermal_data()
response = json.dumps(thermal_data)
# 发送响应
server_socket.sendto(response.encode('utf-8'), addr)
print(f"已发送16宫格数据到 {addr}: {len(response)} 字节")
print(f"数据预览: {thermal_data['grid'][:4]}...")
else:
# 其他命令的简单响应
response = f"OK:{command}"
server_socket.sendto(response.encode('utf-8'), addr)
print(f"已响应: {response}")
except Exception as e:
print(f"服务器错误: {e}")
if __name__ == "__main__":
try:
udp_server()
except KeyboardInterrupt:
print("\n服务器已停止")