diff --git a/datamock.py b/datamock.py index 7ad703d..b7e8a6d 100644 --- a/datamock.py +++ b/datamock.py @@ -1,6 +1,7 @@ import zmq import numpy as np import time +import threading from datetime import datetime # ========== 参数配置 ========== @@ -65,6 +66,28 @@ def main(): sock.connect(SERVER_ADDR) print(f"[{datetime.now().strftime('%H:%M:%S')}] ZMQ Dealer 连接到 {SERVER_ADDR}") + # 后台消费线程:持续 recv 从 ROUTER 返回的数据,避免 server 发送队列积压 + recv_count = [0] + stop_recv = threading.Event() + + def consumer_thread(): + """消费线程:阻塞 recv,丢弃收到的数据,仅用于清空 ROUTER 发送队列""" + while not stop_recv.is_set(): + try: + frames = sock.recv_multipart(zmq.NOBLOCK) + recv_count[0] += 1 + # 收到的格式: [identity, '', filtered_data_bytes] + if recv_count[0] % 500 == 0: + print(f"[{datetime.now().strftime('%H:%M:%S')}] 消费线程已丢弃 {recv_count[0]} 帧滤波数据") + except zmq.Again: + time.sleep(0.01) + except zmq.error.Again: # 兼容旧版 + time.sleep(0.01) + + consumer = threading.Thread(target=consumer_thread, daemon=True) + consumer.start() + print(f"[{datetime.now().strftime('%H:%M:%S')}] 消费线程已启动(daemon)") + global_sample_idx = 0 # 全局采样点计数器 label_type = 1 # 当前标签类型: 1 或 2 label1_count = 0 # label=1 的序号计数器 @@ -109,6 +132,7 @@ def main(): # 发送: multipart 3帧 [identity, '', data] # 使用标准格式(3帧),ROUTER 会自动附加 ZMQ 分配的客户端身份 sock.send_multipart([ + b'', packet.tobytes() ]) @@ -129,6 +153,8 @@ def main(): except KeyboardInterrupt: print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 停止发送,共发送 {packet_count} 包") finally: + stop_recv.set() + consumer.join(timeout=2) sock.close() ctx.term()