smart-home/analyze_official_csv.py
2026-02-26 09:16:34 +08:00

118 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
分析官方APP的CSV文件提取真实的脉冲时长
"""
import csv
import statistics
def parse_csv(filename):
"""解析CSV文件提取脉冲时长"""
times = []
states = []
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
time_str = row['Time'].replace(' ms', '')
time_us = float(time_str) * 1000 # 转换为微秒
state = int(row['Ch01'])
times.append(time_us)
states.append(state)
return times, states
def calculate_pulses(times, states):
"""计算脉冲时长"""
pulses = []
# 跳过第一个时间戳通常是0或很大的初始值
# 从第二个开始计算差值
for i in range(2, len(times)):
duration = times[i] - times[i-1]
# 只保留合理的脉冲时长100μs - 20000μs
if 100 < duration < 20000:
pulses.append(int(duration))
return pulses
def analyze_pulses(pulses):
"""分析脉冲时长,找出引导码、数据码等"""
print(f"总脉冲数: {len(pulses)}")
print(f"\n前20个脉冲:")
for i, pulse in enumerate(pulses[:20]):
print(f" [{i}]: {pulse}μs")
# 统计脉冲时长分布
pulse_counts = {}
for pulse in pulses:
# 四舍五入到最近的50μs
rounded = round(pulse / 50) * 50
pulse_counts[rounded] = pulse_counts.get(rounded, 0) + 1
print(f"\n脉冲时长分布(出现次数 > 5:")
for duration in sorted(pulse_counts.keys()):
count = pulse_counts[duration]
if count > 5:
print(f" {duration}μs: {count}")
# 找出最常见的脉冲时长
common_pulses = sorted(pulse_counts.items(), key=lambda x: x[1], reverse=True)[:10]
print(f"\n最常见的10个脉冲时长:")
for duration, count in common_pulses:
print(f" {duration}μs: {count}")
# 分析引导码前2个脉冲
if len(pulses) >= 2:
print(f"\n引导码分析:")
print(f" 引导码高: {pulses[0]}μs")
print(f" 引导码低: {pulses[1]}μs")
# 分析数据码(跳过引导码)
data_pulses = pulses[2:-1] # 跳过引导码和停止码
# 找出高电平脉冲(偶数位置)
high_pulses = [data_pulses[i] for i in range(0, len(data_pulses), 2)]
# 找出低电平脉冲(奇数位置)
low_pulses = [data_pulses[i] for i in range(1, len(data_pulses), 2)]
print(f"\n数据码分析:")
print(f" 数据高电平: 平均{int(statistics.mean(high_pulses))}μs, "
f"中位数{int(statistics.median(high_pulses))}μs")
# 低电平有两种0和长1
low_pulses_sorted = sorted(low_pulses)
mid_point = len(low_pulses_sorted) // 2
short_lows = low_pulses_sorted[:mid_point]
long_lows = low_pulses_sorted[mid_point:]
print(f" 数据0低电平: 平均{int(statistics.mean(short_lows))}μs, "
f"中位数{int(statistics.median(short_lows))}μs")
print(f" 数据1低电平: 平均{int(statistics.mean(long_lows))}μs, "
f"中位数{int(statistics.median(long_lows))}μs")
# 停止码
if len(pulses) > 0:
print(f"\n停止码: {pulses[-1]}μs")
def main():
files = [
'正确的csv文档/官方app制冷.csv',
'正确的csv文档/官方app制热.csv',
'正确的csv文档/官方app送风模式.csv',
'正确的csv文档/官方app抽湿.csv',
]
for filename in files:
print(f"\n{'='*60}")
print(f"分析文件: {filename}")
print(f"{'='*60}")
times, states = parse_csv(filename)
pulses = calculate_pulses(times, states)
analyze_pulses(pulses)
if __name__ == '__main__':
main()