Files
Depression_TMS/algorithm_V0/datacollect/protocol.py

193 lines
6.0 KiB
Python
Raw Normal View History

2026-06-01 13:18:36 +08:00
from typing import List, Tuple, Union, Optional
class ProtocolFrame:
# 协议常量
FRAME_HEADER = 0xAA
FRAME_TAIL1 = 0x55
FRAME_TAIL2 = 0x55
RESERVED_SIZE = 6
MIN_FRAME_SIZE = 13 # 帧头1 + 功能1 + 长度2 + 预留6 + CRC1 + 包尾2
MAX_DATA_LENGTH = 0xFFFF # 最大数据长度 (2字节能表示的最大值)
@staticmethod
def calculate_crc8(data: bytes) -> bytes:
"""
计算CRC8校验值
Args:
data: 需要计算CRC的数据
Returns:
一个字节的CRC值bytes类型
"""
crc = 0
for byte in data:
crc ^= byte
for _ in range(8):
crc = ((crc << 1) ^ 0x07 if crc & 0x80 else crc << 1) & 0xFF
return bytes([crc])
@classmethod
def pack(cls, function, data: Union[bytes, bytearray, List[int]],
reserved: Optional[Union[bytes, bytearray, List[int]]] = None) -> bytes:
"""
协议打包函数
Args:
function: 功能码 (1字节)
data: 数据块
reserved: 预留字节(6字节可选)
Returns:
打包后的字节数据
"""
# 检查功能码
if function != None:
if not 0 <= function <= 0xFF:
raise ValueError("功能码必须是1字节")
# 转换数据为bytearray
if isinstance(data, list):
data = bytearray(data)
elif isinstance(data, bytes):
data = bytearray(data)
# 检查数据长度
data_length = len(data)
if data_length > cls.MAX_DATA_LENGTH:
raise ValueError(f"数据长度超过最大值 {cls.MAX_DATA_LENGTH}")
# 处理预留字节
if reserved is None:
reserved = bytearray([0] * cls.RESERVED_SIZE)
else:
if isinstance(reserved, list):
reserved = bytearray(reserved)
elif isinstance(reserved, bytes):
reserved = bytearray(reserved)
if len(reserved) != cls.RESERVED_SIZE:
raise ValueError(f"预留字节必须是{cls.RESERVED_SIZE}字节")
# 构建帧
frame = bytearray([cls.FRAME_HEADER]) # 帧头 (1字节)
if function != None:
frame.append(function) # 功能码 (1字节)
data_length+=6
# 数据长度 (2字节大端序)
frame.append((data_length >> 8) & 0xFF) # 高字节
frame.append(data_length & 0xFF) # 低字节
if function != None:
frame.extend(reserved) # 预留字节 (6字节)
frame.extend(data) # 数据块 (变长)
# 计算CRC (从功能码开始到数据块结束)
crc = cls.calculate_crc8(frame[1:]) # 不包含帧头
frame.extend(crc) # CRC校验 (1字节)
# 添加帧尾
frame.extend([cls.FRAME_TAIL1, cls.FRAME_TAIL2]) # 帧尾 (2字节)
return bytes(frame)
@classmethod
def unpack(cls, data: Union[bytes, bytearray]) -> Tuple[int, bytearray, bytearray]:
"""
协议解包函数
Args:
data: 待解析的字节数据
Returns:
(功能码, 数据块, 预留字节)
Raises:
ValueError: 当数据格式不正确时
"""
# 检查数据长度
if len(data) < cls.MIN_FRAME_SIZE:
raise ValueError("数据长度不足")
# 检查帧头
if data[0] != cls.FRAME_HEADER:
raise ValueError("帧头错误")
# 检查帧尾
if data[-2:] != bytes([cls.FRAME_TAIL1, cls.FRAME_TAIL2]):
raise ValueError("帧尾错误")
# 解析基本信息
function = data[1] # 功能码 (1字节)
# 数据长度 (2字节大端序)
data_length = (data[2] << 8) | data[3]
reserved = data[4:10] # 预留字节 (6字节)
# 检查数据长度
expected_length = cls.MIN_FRAME_SIZE + data_length
if len(data) != expected_length:
raise ValueError(f"数据长度不匹配: 期望{expected_length}字节,实际{len(data)}字节")
# 提取数据块
payload = data[10:10 + data_length]
# 验证CRC (从功能码开始到数据块结束)
received_crc = data[-3]
calculated_crc = cls.calculate_crc8(data[1:-3])[0] # 获取字节值
if received_crc != calculated_crc:
raise ValueError(f"CRC校验失败: 期望{calculated_crc:02X},实际{received_crc:02X}")
return function, bytearray(payload), bytearray(reserved)
def print_hex(data: bytes, label: str = ""):
"""打印十六进制数据,并按字节添加空格"""
hex_str = ' '.join([f"{b:02X}" for b in data])
if label:
print(f"{label}: {hex_str}")
else:
print(hex_str)
def print_frame_details(data: bytes):
"""打印帧的详细信息"""
print("帧详细信息:")
print(f"帧头: {data[0]:02X}")
print(f"功能码: {data[1]:02X}")
print(f"数据长度: {data[2]:02X} {data[3]:02X} ({(data[2] << 8) | data[3]}字节)")
print(f"预留字节: {' '.join([f'{b:02X}' for b in data[4:10]])}")
data_length = (data[2] << 8) | data[3]
print(f"数据块: {' '.join([f'{b:02X}' for b in data[10:10 + data_length]])}")
print(f"CRC校验: {data[-3]:02X}")
print(f"帧尾: {data[-2]:02X} {data[-1]:02X}")
# 使用示例
def example_usage():
try:
# 示例1简单数据打包
function_code = 0x01
data = [0x1]
packed_data = ProtocolFrame.pack(function_code, data)
print_hex(packed_data, "示例1 - 完整帧")
print_frame_details(packed_data)
print()
# 示例3解包验证
function, payload, reserved = ProtocolFrame.unpack(packed_data)
print("解包结果:")
print(f"功能码: 0x{function:02X}")
print_hex(payload, "数据块")
print_hex(reserved, "预留字节")
except ValueError as e:
print(f"错误: {e}")
if __name__ == "__main__":
example_usage()