smart-home/analyze_official_csv.py

118 lines
3.8 KiB
Python
Raw Normal View History

2026-02-26 09:16:34 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
分析官方APP的CSV文件提取真实的脉冲时长
"""
import csv
import statistics
def parse_csv(filename):
"""解析CSV文件提取脉冲时长"""
times = []
states = []
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
time_str = row['Time'].replace(' ms', '')
time_us = float(time_str) * 1000 # 转换为微秒
state = int(row['Ch01'])
times.append(time_us)
states.append(state)
return times, states
def calculate_pulses(times, states):
"""计算脉冲时长"""
pulses = []
# 跳过第一个时间戳通常是0或很大的初始值
# 从第二个开始计算差值
for i in range(2, len(times)):
duration = times[i] - times[i-1]
# 只保留合理的脉冲时长100μs - 20000μs
if 100 < duration < 20000:
pulses.append(int(duration))
return pulses
def analyze_pulses(pulses):
"""分析脉冲时长,找出引导码、数据码等"""
print(f"总脉冲数: {len(pulses)}")
print(f"\n前20个脉冲:")
for i, pulse in enumerate(pulses[:20]):
print(f" [{i}]: {pulse}μs")
# 统计脉冲时长分布
pulse_counts = {}
for pulse in pulses:
# 四舍五入到最近的50μs
rounded = round(pulse / 50) * 50
pulse_counts[rounded] = pulse_counts.get(rounded, 0) + 1
print(f"\n脉冲时长分布(出现次数 > 5:")
for duration in sorted(pulse_counts.keys()):
count = pulse_counts[duration]
if count > 5:
print(f" {duration}μs: {count}")
# 找出最常见的脉冲时长
common_pulses = sorted(pulse_counts.items(), key=lambda x: x[1], reverse=True)[:10]
print(f"\n最常见的10个脉冲时长:")
for duration, count in common_pulses:
print(f" {duration}μs: {count}")
# 分析引导码前2个脉冲
if len(pulses) >= 2:
print(f"\n引导码分析:")
print(f" 引导码高: {pulses[0]}μs")
print(f" 引导码低: {pulses[1]}μs")
# 分析数据码(跳过引导码)
data_pulses = pulses[2:-1] # 跳过引导码和停止码
# 找出高电平脉冲(偶数位置)
high_pulses = [data_pulses[i] for i in range(0, len(data_pulses), 2)]
# 找出低电平脉冲(奇数位置)
low_pulses = [data_pulses[i] for i in range(1, len(data_pulses), 2)]
print(f"\n数据码分析:")
print(f" 数据高电平: 平均{int(statistics.mean(high_pulses))}μs, "
f"中位数{int(statistics.median(high_pulses))}μs")
# 低电平有两种0和长1
low_pulses_sorted = sorted(low_pulses)
mid_point = len(low_pulses_sorted) // 2
short_lows = low_pulses_sorted[:mid_point]
long_lows = low_pulses_sorted[mid_point:]
print(f" 数据0低电平: 平均{int(statistics.mean(short_lows))}μs, "
f"中位数{int(statistics.median(short_lows))}μs")
print(f" 数据1低电平: 平均{int(statistics.mean(long_lows))}μs, "
f"中位数{int(statistics.median(long_lows))}μs")
# 停止码
if len(pulses) > 0:
print(f"\n停止码: {pulses[-1]}μs")
def main():
files = [
'正确的csv文档/官方app制冷.csv',
'正确的csv文档/官方app制热.csv',
'正确的csv文档/官方app送风模式.csv',
'正确的csv文档/官方app抽湿.csv',
]
for filename in files:
print(f"\n{'='*60}")
print(f"分析文件: {filename}")
print(f"{'='*60}")
times, states = parse_csv(filename)
pulses = calculate_pulses(times, states)
analyze_pulses(pulses)
if __name__ == '__main__':
main()