Files
2026-06-01 13:42:38 +08:00

149 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import zmq
import threading
import json
import queue
from Device.SunnyLinker import SunnyLinker64
class zmqServer(threading.Thread):
def __init__(self, host='0.0.0.0', port=8099):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.running = False
self.get_Impedance = False # 是否返回阻抗值
self.open_Impedance = None # 是否开启阻抗检测功能
self.StartDecode = False # false 停止解码true=开始解码
self.StartTrain = False # False未进入训练状态True处于训练状态
self.state_mode = None # 'train'为训练状态rest'为休息状态,'test'为测试状态
self.currentLabel = -1 # 接收刺激端消息,了解刺激端当前的训练标签
self.IsExitApp = False # 当socket收到2的时候就置为True代表要退出系统了。
self.getReport = False # 获取训练报告内容
self.daemon = True
# 创建 ZeroMQ 上下文
self.context = zmq.Context()
# 创建 REP 套接字(响应端)
self.socket = self.context.socket(zmq.ROUTER)
self.socket.bind(f"tcp://{self.host}:{self.port}") # 绑定到端口 8099
self.targetFreqs = []
self.changeTarget = False # 更换目标频率
self.sunnyLinker = SunnyLinker64(None, None, None, None,None) #单例模式类已在Decoder实例化
self.labels = [0x01, 0x02,0x03]
self.decoder_switch = False #更换解码器
self.decoder_class = None #解码器类别 'ssvep','ssmvep','mi'
# Client Management (e.g. Unity, Other listeners)
self.clients = set() # 维护客户端ID
self.send_queue = queue.Queue() # 发送队列安全信箱维护socket线程
def broadcast_message(self, method, params):
"""Put message into queue to be sent to all connected clients"""
self.send_queue.put((method, params))
def run(self):
self.running = True
print(f"Server is running on {self.host}:{self.port}")
# Use Poller for non-blocking receive
poller = zmq.Poller()
poller.register(self.socket, zmq.POLLIN)
try:
while self.running:
# 1. Process Send Queue (Send to all clients)
while not self.send_queue.empty():
method, params = self.send_queue.get()
if self.clients:
try:
msg = {'method': method, 'params': params}
msg_bytes = json.dumps(msg).encode('utf-8')
if method in ['single_trial_plot', 'single_trial_plot', 'miReport']:
print(f"{{'method': '{method}', 'params': <Base64 Image Data>}}")
else:
print(f"Sending message: {msg}")
# Broadcast to all maintained clients
for client_id in list(self.clients):
try:
# Send: [ID, Empty, JSON]
self.socket.send_multipart([client_id, b'', msg_bytes])
except Exception as e:
print(f"Error sending to client {client_id}: {e}")
except Exception as e:
print(f"Error preparing broadcast: {e}")
# 2. Process Receive (Commands)
socks = dict(poller.poll(10)) # 100ms timeout
if self.socket in socks and socks[self.socket] == zmq.POLLIN:
frames = self.socket.recv_multipart()
if len(frames) < 3:
continue
ident, _, message_bytes = frames[:3]
if ident not in self.clients: # register client ID
self.clients.add(ident)
print(f"New Client Detected: {ident}")
try:
message = json.loads(message_bytes.decode('utf-8'))
except json.JSONDecodeError:
continue
print(f"Received request: {message}")
method = message.get("method") # process request
params = message.get("params")
if method == "sync":
self.state_mode = 'sync'
if method == "targetFreqs":
if not isinstance(params,list):
print('targetFreqs must be a list')
continue
if params != self.targetFreqs:
self.targetFreqs = params
self.changeTarget = True
if method == "decoderClass":
if not isinstance(params,str):
print('decoderClass must be a str')
continue
if params != self.decoder_class:
self.decoder_class = params
self.decoder_switch = True
if method == "getReport":
self.getReport = True
if method == "train":#训练状态
self.state_mode = 'train'
self.StartTrain = True
self.currentLabel = params # 当前刺激端的训练标签
self.sunnyLinker.push_trigger(self.labels[self.currentLabel])
elif method == "predict":#预测状态
self.state_mode = 'predict'
if params == 1: #开始解码
self.StartDecode = True
self.sunnyLinker.push_trigger(0x63)
elif params == 2: #停止解码
self.IsExitApp = True
self.running = False
elif method == "rest": #休息状态
self.state_mode = 'rest'
elif method == "impedance":
if params == 1:
self.open_Impedance = True # 开启阻抗
self.get_Impedance = True # 返回阻抗
elif params == 2:
self.open_Impedance = False # 关闭阻抗
self.get_Impedance = False # 停止返回阻抗
except Exception as e:
print(f"An socket error occurred: {e}")
finally:
self.running = False
# 关闭套接字和上下文
self.socket.close()
self.context.term()
print("Server socket and context closed.")
def stop(self):
"""显式关闭服务器"""
self.running = False
self.socket.close()
self.context.term()
print("Server closed explicitly.")
if __name__ == '__main__':
server = zmqServer()
server.start()