v1
This commit is contained in:
@@ -217,47 +217,48 @@ class zmqServer(threading.Thread):
|
||||
处理8100端口原始脑电二进制数据
|
||||
固定格式:上位机发送 (5,66) float32 二维数组字节流(已转换为微伏物理量)→ 转置为 (66,5) 写入双缓冲区
|
||||
"""
|
||||
# 1. 校验ZMQ消息帧完整性
|
||||
if len(frames) < 3:
|
||||
print(f"[ERROR] 无效数据帧:长度不足3帧,实际长度={len(frames)}")
|
||||
# 1. 校验ZMQ消息帧完整性(ROUTER接收DEALER消息的帧格式:[客户端ID, 发送方ID, 空帧, 数据帧])
|
||||
if len(frames) < 4: # 至少需要4帧
|
||||
algo_log(f"Invalid data frame: 帧数量不足,期望≥4,实际{len(frames)}", level="ERROR")
|
||||
return
|
||||
|
||||
ident, _, data_bytes = frames[:3]
|
||||
# 2. 正确解析帧(适配DEALER→ROUTER的帧格式)
|
||||
client_ident, sender_ident, empty_sep, data_bytes = frames[:4]
|
||||
if empty_sep != b'': # 校验空分隔帧
|
||||
algo_log(f"Invalid frame separator: 期望空字节,实际{empty_sep}", level="ERROR")
|
||||
return
|
||||
|
||||
# 2. 客户端管理(单客户端场景,自动更新最新身份)
|
||||
if ident not in self.data_clients:
|
||||
self.data_clients.add(ident)
|
||||
self.current_data_client = ident # 保存唯一客户端身份,用于后续回复滤波结果
|
||||
print(f"[INFO] 新数据客户端连接成功:{ident}")
|
||||
# 3. 客户端管理(单客户端场景,自动更新最新身份)
|
||||
if client_ident not in self.data_clients:
|
||||
self.data_clients.add(client_ident)
|
||||
self.current_data_client = client_ident # 保存唯一客户端身份,用于后续回复滤波结果
|
||||
print(f"[INFO] 新数据客户端连接成功:{client_ident}")
|
||||
|
||||
try:
|
||||
# 3. 精确长度校验(核心:固定(5,66) float32 = 5*66*4=1320字节,与int32字节数相同)
|
||||
# 4. 精确长度校验(核心:固定(5,66) float32 = 5*66*4=1320字节)
|
||||
EXPECTED_BYTES = self.device_info['frame_points'] * self.device_info['channel_nums'] * 4 # 每个float32占4字节
|
||||
if len(data_bytes) != EXPECTED_BYTES:
|
||||
print(f"[ERROR] 数据长度错误:期望{EXPECTED_BYTES}字节,实际{len(data_bytes)}字节")
|
||||
algo_log(f"[ERROR] 数据长度错误:期望{EXPECTED_BYTES}字节,实际{len(data_bytes)}字节", level="ERROR")
|
||||
return
|
||||
|
||||
# 4. 零拷贝二进制解析 + 维度转换
|
||||
# 步骤:字节流 → (330,) float32数组 → (5,66) 原始格式 → 转置为 (66,5) 缓冲区标准格式
|
||||
# 5. 零拷贝二进制解析 + 维度转换
|
||||
|
||||
data_np = np.frombuffer(data_bytes, dtype=np.float32)
|
||||
# 重塑为上位机原始维度
|
||||
data_np = data_np.reshape(self.device_info['frame_points'], self.device_info['channel_nums'])
|
||||
# 转置为(通道数, 采样点数)标准格式,转换为float64保证滤波运算精度
|
||||
data_np = data_np.T.astype(np.float64)
|
||||
|
||||
# 5. 同时写入双环形缓冲区(方法名与现有类保持一致:appendBuffer)
|
||||
# 注意:上位机已发送微伏物理量,无需再乘以增益系数
|
||||
# 6. 写入缓冲区
|
||||
self.paradigmBuffer.appendBuffer(data_np)
|
||||
self.filterBuffer.appendBuffer(data_np)
|
||||
|
||||
# 生产环境必须注释!每秒50次打印会导致CPU占用飙升30%以上
|
||||
algo_log(f"数据写入成功:shape={data_np.shape}, 范围=[{data_np.min():.2f}, {data_np.max():.2f}] μV", level="DEBUG", record_once=True)
|
||||
algo_log(f"数据写入成功:shape={data_np.shape}, 范围=[{data_np.min():.2f}, {data_np.max():.2f}] μV", level="DEBUG")
|
||||
|
||||
except Exception as e:
|
||||
algo_log(f"数据处理失败:{str(e)}", level="ERROR")
|
||||
# 调试阶段临时打开,生产环境务必注释
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
if IniRead('system', 'algo_log_level', 'INFO') == 'DEBUG':
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
|
||||
def _process_send_queue(self):
|
||||
"""处理发送队列,向所有命令客户端广播消息"""
|
||||
|
||||
Reference in New Issue
Block a user