Files
2026-06-01 13:42:38 +08:00

193 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import List, Tuple, Union, Optional
class ProtocolFrame:
# 协议常量
FRAME_HEADER = 0xAA
FRAME_TAIL1 = 0x55
FRAME_TAIL2 = 0x55
RESERVED_SIZE = 6
MIN_FRAME_SIZE = 13 # 帧头1 + 功能1 + 长度2 + 预留6 + CRC1 + 包尾2
MAX_DATA_LENGTH = 0xFFFF # 最大数据长度 (2字节能表示的最大值)
@staticmethod
def calculate_crc8(data: bytes) -> bytes:
"""
计算CRC8校验值
Args:
data: 需要计算CRC的数据
Returns:
一个字节的CRC值bytes类型
"""
crc = 0
for byte in data:
crc ^= byte
for _ in range(8):
crc = ((crc << 1) ^ 0x07 if crc & 0x80 else crc << 1) & 0xFF
return bytes([crc])
@classmethod
def pack(cls, function, data: Union[bytes, bytearray, List[int]],
reserved: Optional[Union[bytes, bytearray, List[int]]] = None) -> bytes:
"""
协议打包函数
Args:
function: 功能码 (1字节)
data: 数据块
reserved: 预留字节(6字节可选)
Returns:
打包后的字节数据
"""
# 检查功能码
if function != None:
if not 0 <= function <= 0xFF:
raise ValueError("功能码必须是1字节")
# 转换数据为bytearray
if isinstance(data, list):
data = bytearray(data)
elif isinstance(data, bytes):
data = bytearray(data)
# 检查数据长度
data_length = len(data)
if data_length > cls.MAX_DATA_LENGTH:
raise ValueError(f"数据长度超过最大值 {cls.MAX_DATA_LENGTH}")
# 处理预留字节
if reserved is None:
reserved = bytearray([0] * cls.RESERVED_SIZE)
else:
if isinstance(reserved, list):
reserved = bytearray(reserved)
elif isinstance(reserved, bytes):
reserved = bytearray(reserved)
if len(reserved) != cls.RESERVED_SIZE:
raise ValueError(f"预留字节必须是{cls.RESERVED_SIZE}字节")
# 构建帧
frame = bytearray([cls.FRAME_HEADER]) # 帧头 (1字节)
if function != None:
frame.append(function) # 功能码 (1字节)
data_length+=6
# 数据长度 (2字节大端序)
frame.append((data_length >> 8) & 0xFF) # 高字节
frame.append(data_length & 0xFF) # 低字节
if function != None:
frame.extend(reserved) # 预留字节 (6字节)
frame.extend(data) # 数据块 (变长)
# 计算CRC (从功能码开始到数据块结束)
crc = cls.calculate_crc8(frame[1:]) # 不包含帧头
frame.extend(crc) # CRC校验 (1字节)
# 添加帧尾
frame.extend([cls.FRAME_TAIL1, cls.FRAME_TAIL2]) # 帧尾 (2字节)
return bytes(frame)
@classmethod
def unpack(cls, data: Union[bytes, bytearray]) -> Tuple[int, bytearray, bytearray]:
"""
协议解包函数
Args:
data: 待解析的字节数据
Returns:
(功能码, 数据块, 预留字节)
Raises:
ValueError: 当数据格式不正确时
"""
# 检查数据长度
if len(data) < cls.MIN_FRAME_SIZE:
raise ValueError("数据长度不足")
# 检查帧头
if data[0] != cls.FRAME_HEADER:
raise ValueError("帧头错误")
# 检查帧尾
if data[-2:] != bytes([cls.FRAME_TAIL1, cls.FRAME_TAIL2]):
raise ValueError("帧尾错误")
# 解析基本信息
function = data[1] # 功能码 (1字节)
# 数据长度 (2字节大端序)
data_length = (data[2] << 8) | data[3]
reserved = data[4:10] # 预留字节 (6字节)
# 检查数据长度
expected_length = cls.MIN_FRAME_SIZE + data_length
if len(data) != expected_length:
raise ValueError(f"数据长度不匹配: 期望{expected_length}字节,实际{len(data)}字节")
# 提取数据块
payload = data[10:10 + data_length]
# 验证CRC (从功能码开始到数据块结束)
received_crc = data[-3]
calculated_crc = cls.calculate_crc8(data[1:-3])[0] # 获取字节值
if received_crc != calculated_crc:
raise ValueError(f"CRC校验失败: 期望{calculated_crc:02X},实际{received_crc:02X}")
return function, bytearray(payload), bytearray(reserved)
def print_hex(data: bytes, label: str = ""):
"""打印十六进制数据,并按字节添加空格"""
hex_str = ' '.join([f"{b:02X}" for b in data])
if label:
print(f"{label}: {hex_str}")
else:
print(hex_str)
def print_frame_details(data: bytes):
"""打印帧的详细信息"""
print("帧详细信息:")
print(f"帧头: {data[0]:02X}")
print(f"功能码: {data[1]:02X}")
print(f"数据长度: {data[2]:02X} {data[3]:02X} ({(data[2] << 8) | data[3]}字节)")
print(f"预留字节: {' '.join([f'{b:02X}' for b in data[4:10]])}")
data_length = (data[2] << 8) | data[3]
print(f"数据块: {' '.join([f'{b:02X}' for b in data[10:10 + data_length]])}")
print(f"CRC校验: {data[-3]:02X}")
print(f"帧尾: {data[-2]:02X} {data[-1]:02X}")
# 使用示例
def example_usage():
try:
# 示例1简单数据打包
function_code = 0x01
data = [0x1]
packed_data = ProtocolFrame.pack(function_code, data)
print_hex(packed_data, "示例1 - 完整帧")
print_frame_details(packed_data)
print()
# 示例3解包验证
function, payload, reserved = ProtocolFrame.unpack(packed_data)
print("解包结果:")
print(f"功能码: 0x{function:02X}")
print_hex(payload, "数据块")
print_hex(reserved, "预留字节")
except ValueError as e:
print(f"错误: {e}")
if __name__ == "__main__":
example_usage()