diff --git a/ZeroMQClient_mock.py b/ZeroMQClient_mock.py new file mode 100644 index 0000000..a3ef081 --- /dev/null +++ b/ZeroMQClient_mock.py @@ -0,0 +1,166 @@ +import zmq +import time +import json +import os +import threading + +def receive_messages(socket, stop_event): + """ + 后台线程函数,用于持续接收服务器消息 + + Args: + socket (zmq.Socket): ZeroMQ套接字 + stop_event (threading.Event): 停止事件,用于通知线程退出 + """ + print("开始持续接收服务器数据...") + print("-" * 50) + + while not stop_event.is_set(): + try: + # 设置接收超时为1秒,避免阻塞 + socket.setsockopt(zmq.RCVTIMEO, 1000) + # 接收服务器的消息 + frames = socket.recv_multipart() + + # DEALER 套接字接收消息格式:[身份标识, 空帧, 消息内容] + # 使用frames[-1]获取最后一帧,无论中间有多少空帧 + if len(frames) >= 2: + message = frames[-1].decode('utf-8') + + # 尝试解析为JSON格式 + try: + json_message = json.loads(message) + # 检查消息长度 + json_str = str(json_message) + if len(json_str) > 100: + print(f"收到服务器数据 (JSON): {json_str[:100]}...") + else: + print(f"收到服务器数据 (JSON): {json_message}") + except json.JSONDecodeError: + # 检查消息长度 + if len(message) > 100: + print(f"收到服务器数据 (原始): {message[:100]}...") + else: + print(f"收到服务器数据 (原始): {message}") + else: + print(f"收到服务器数据 (格式异常): {frames}") + + except zmq.Again: + # 接收超时,继续循环 + continue + except Exception as e: + print(f"接收消息时发生错误: {e}") + # 短暂暂停后继续接收 + time.sleep(1) + + print("接收线程已停止。") + +def zero_mq_client(server_address="tcp://192.168.254.101:8099"): + """ + ZeroMQ客户端函数,用于与服务器通信 + + Args: + server_address (str): 服务器地址,格式为"tcp://IP:端口" + """ + # 创建 ZeroMQ 上下文 + context = zmq.Context() + + # 创建 DEALER 套接字 + socket = context.socket(zmq.DEALER) + + # 生成唯一的身份标识 + identity = str('wdd').encode('utf-8') + socket.setsockopt(zmq.IDENTITY, identity) + + try: + # 连接到服务器 + print(f"连接到服务器 {server_address}...") + socket.connect(server_address) + + # 定义消息集 + message_set = [ + {"method": "sync", "params": 1}, + {"method": "decoderClass", "params": "mi"}, + {"method": "decoderClass", "params": "ssvep"}, + {"method": "decoderClass", "params": "ssmvep"}, + {"method": "decoderClass", "params": "blink"}, + {"method": "decoderClass", "params": "concentration"}, + {"method": "train", "params": 0}, + {"method": "train", "params": 1}, + {"method": "rest", "params": 0}, + {"method": "predict", "params": 1}, + {"method": "getReport", "params": 0} + ] + + # 打印消息集 + print("消息集:") + for i, msg in enumerate(message_set): + print(f"[{i}] {msg}") + print("-" * 50) + + # 创建停止事件 + stop_event = threading.Event() + + # 启动接收线程 + receive_thread = threading.Thread(target=receive_messages, args=(socket, stop_event)) + receive_thread.daemon = True # 设置为守护线程,主线程退出时自动退出 + receive_thread.start() + + # 主线程处理控制台输入 + print("输入消息序号发送对应消息,输入'q'退出程序:") + while True: + try: + # 获取用户输入 + user_input = input("请输入消息序号: ") + + # 检查是否退出 + if user_input.lower() == 'q': + print("正在退出程序...") + break + + # 尝试转换为整数 + msg_index = int(user_input) + + # 检查序号是否有效 + if 0 <= msg_index < len(message_set): + # 获取对应的消息 + selected_message = message_set[msg_index] + + # 将消息转换为 JSON 字符串 + json_message = json.dumps(selected_message) + + # 打印发送信息 + print(f"\n发送消息 (大小: {len(json_message)} 字节)...") + print(f"消息方法: {selected_message['method']}") + print(f"参数值: {selected_message['params']}") + + # DEALER 套接字发送消息,包含身份标识和空帧 + socket.send_multipart([identity, json_message.encode('utf-8')]) + print("消息发送完成!") + print("-" * 50) + else: + print(f"无效的消息序号,请输入 0-{len(message_set)-1} 之间的数字。") + print("消息集:") + for i, msg in enumerate(message_set): + print(f"[{i}] {msg}") + print("-" * 50) + + except ValueError: + print("请输入有效的数字或'q'退出。") + except Exception as e: + print(f"处理输入时发生错误: {e}") + + except KeyboardInterrupt: + print("\n程序被手动终止。") + finally: + # 停止接收线程 + stop_event.set() + # 等待接收线程停止 + time.sleep(1) + # 关闭套接字和上下文 + socket.close() + context.term() + print("客户端已关闭。") + +if __name__ == "__main__": + zero_mq_client() \ No newline at end of file