Compare commits
2 Commits
31d91d6cc7
...
67587f354b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67587f354b | ||
|
|
d5ef2311a1 |
26
datamock.py
26
datamock.py
@@ -1,6 +1,7 @@
|
|||||||
import zmq
|
import zmq
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import time
|
import time
|
||||||
|
import threading
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
# ========== 参数配置 ==========
|
# ========== 参数配置 ==========
|
||||||
@@ -65,6 +66,28 @@ def main():
|
|||||||
sock.connect(SERVER_ADDR)
|
sock.connect(SERVER_ADDR)
|
||||||
print(f"[{datetime.now().strftime('%H:%M:%S')}] ZMQ Dealer 连接到 {SERVER_ADDR}")
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] ZMQ Dealer 连接到 {SERVER_ADDR}")
|
||||||
|
|
||||||
|
# 后台消费线程:持续 recv 从 ROUTER 返回的数据,避免 server 发送队列积压
|
||||||
|
recv_count = [0]
|
||||||
|
stop_recv = threading.Event()
|
||||||
|
|
||||||
|
def consumer_thread():
|
||||||
|
"""消费线程:阻塞 recv,丢弃收到的数据,仅用于清空 ROUTER 发送队列"""
|
||||||
|
while not stop_recv.is_set():
|
||||||
|
try:
|
||||||
|
frames = sock.recv_multipart(zmq.NOBLOCK)
|
||||||
|
recv_count[0] += 1
|
||||||
|
# 收到的格式: [identity, '', filtered_data_bytes]
|
||||||
|
if recv_count[0] % 500 == 0:
|
||||||
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] 消费线程已丢弃 {recv_count[0]} 帧滤波数据")
|
||||||
|
except zmq.Again:
|
||||||
|
time.sleep(0.01)
|
||||||
|
except zmq.error.Again: # 兼容旧版
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
consumer = threading.Thread(target=consumer_thread, daemon=True)
|
||||||
|
consumer.start()
|
||||||
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] 消费线程已启动(daemon)")
|
||||||
|
|
||||||
global_sample_idx = 0 # 全局采样点计数器
|
global_sample_idx = 0 # 全局采样点计数器
|
||||||
label_type = 1 # 当前标签类型: 1 或 2
|
label_type = 1 # 当前标签类型: 1 或 2
|
||||||
label1_count = 0 # label=1 的序号计数器
|
label1_count = 0 # label=1 的序号计数器
|
||||||
@@ -109,6 +132,7 @@ def main():
|
|||||||
# 发送: multipart 3帧 [identity, '', data]
|
# 发送: multipart 3帧 [identity, '', data]
|
||||||
# 使用标准格式(3帧),ROUTER 会自动附加 ZMQ 分配的客户端身份
|
# 使用标准格式(3帧),ROUTER 会自动附加 ZMQ 分配的客户端身份
|
||||||
sock.send_multipart([
|
sock.send_multipart([
|
||||||
|
b'',
|
||||||
packet.tobytes()
|
packet.tobytes()
|
||||||
])
|
])
|
||||||
|
|
||||||
@@ -129,6 +153,8 @@ def main():
|
|||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 停止发送,共发送 {packet_count} 包")
|
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 停止发送,共发送 {packet_count} 包")
|
||||||
finally:
|
finally:
|
||||||
|
stop_recv.set()
|
||||||
|
consumer.join(timeout=2)
|
||||||
sock.close()
|
sock.close()
|
||||||
ctx.term()
|
ctx.term()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user