From b26ae2ce3c246e5bf6f8cf4619c75f88c34c41d6 Mon Sep 17 00:00:00 2001 From: Ivey Song Date: Fri, 12 Jun 2026 11:32:39 +0800 Subject: [PATCH] =?UTF-8?q?beta=20psd=20=E7=8B=AC=E7=AB=8B=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Decoder.py | 2 +- Zmq/filterProcess.py | 110 +++++++++++++++++++++++++++++++++---------- 2 files changed, 87 insertions(+), 25 deletions(-) diff --git a/Decoder.py b/Decoder.py index d5e7805..486446f 100644 --- a/Decoder.py +++ b/Decoder.py @@ -63,7 +63,7 @@ class Decoder_main(threading.Thread): # 注册滤波结果回调(示例:打印数据形状) self.sliding_filter.filter_result_callback = self.zmqServer.send_filtered_data # 注册 beta_psd 广播回调,每秒通过 8099 端口发送给上位机 - self.sliding_filter.beta_broadcast_callback = lambda v: self.zmqServer.broadcast_message('beta_psd', v) + self.sliding_filter.set_beta_broadcast_callback(lambda v: self.zmqServer.broadcast_message('beta_psd', v)) def is_valid_signal(self, data, threshold=1e5): # 判断当前信号是否为有效信号 # data: (chans, samples) diff --git a/Zmq/filterProcess.py b/Zmq/filterProcess.py index a829233..cc0971a 100644 --- a/Zmq/filterProcess.py +++ b/Zmq/filterProcess.py @@ -5,6 +5,7 @@ import numpy as np import time import threading +import queue from scipy import signal from logs.log import algo_log import sys @@ -93,7 +94,74 @@ class FilterRingBuffer: self.has_new_data = False # 重置时清空新数据标记 # ----------------------------------------------------------------------------- -# 2. 独立滑动滤波类(仅负责滤波业务逻辑,不关心缓存实现) +# 2. 独立 Beta PSD 计算线程(避免阻塞滤波主循环的 200ms 定时) +# ----------------------------------------------------------------------------- +class BetaPsdCalculator(threading.Thread): + """独立的 Beta PSD 计算线程,使用队列与滤波主线程解耦""" + + def __init__(self, fs=250, window_size=750): + super().__init__(daemon=True) + self.fs = fs + self.window_size = window_size + self._beta_calc = Beta_Calculate(Threshold_value_low=0, Threshold_value_high=0, fs=fs) + self._input_queue = queue.Queue(maxsize=2) + self._running = threading.Event() + self._running.set() + self._latest_beta = None + self._beta_lock = threading.Lock() + self.beta_broadcast_callback = None + + def push_data(self, data): + """供外部调用的线程安全数据推送接口""" + try: + self._input_queue.put_nowait(data) + except queue.Full: + try: + self._input_queue.get_nowait() + except queue.Empty: + pass + try: + self._input_queue.put_nowait(data) + except queue.Full: + pass + + def get_latest_beta(self): + """获取最新的 beta 值(线程安全)""" + with self._beta_lock: + return self._latest_beta + + def run(self): + while self._running.is_set(): + try: + data = self._input_queue.get(timeout=1.5) + if data is None: + break + try: + beta_psd, _, _ = self._beta_calc.calculate_all( + data, fs=self.fs, nperseg=min(self.window_size, data.shape[1]) + ) + with self._beta_lock: + self._latest_beta = round(float(beta_psd), 3) + if self.beta_broadcast_callback is not None: + self.beta_broadcast_callback(self._latest_beta) + except Exception as e: + algo_log(f"Beta PSD 计算异常: {e}", level='error') + except queue.Empty: + pass + + def stop(self): + """停止计算线程""" + self._running.clear() + try: + self._input_queue.put_nowait(None) + except queue.Full: + pass + if self.is_alive(): + self.join(timeout=2) + + +# ----------------------------------------------------------------------------- +# 3. 独立滑动滤波类(仅负责滤波业务逻辑,不关心缓存实现) # ----------------------------------------------------------------------------- class SlidingFilter(threading.Thread): def __init__( @@ -121,11 +189,7 @@ class SlidingFilter(threading.Thread): self.running.set() # 滤波结果回调(外部可注册,获取滤波后的数据) self.filter_result_callback = None - # beta_psd 广播回调(外部注册,用于走 zmqServer 8099 端口发送) - self.beta_broadcast_callback = None - # beta 计算器(Fp1/Fp2 通道,索引 0/1) - self._beta_calc = Beta_Calculate(Threshold_value_low=0, Threshold_value_high=0, fs=srate) # beta 每秒触发计数(200ms步长,5次 = 1s) self._beta_step_counter = 0 self._beta_steps_per_second = max(1, int(round(1.0 / step_sec))) # 5 @@ -133,11 +197,23 @@ class SlidingFilter(threading.Thread): # 预计算滤波器系数(仅执行一次) self._init_filters() + # 独立的 Beta 计算线程(避免阻塞滤波主循环) + self._beta_thread = BetaPsdCalculator(fs=srate, window_size=self.window_size) + + def start(self): + """同时启动 Beta 计算线程和滤波主线程""" + self._beta_thread.start() + super().start() + + def set_beta_broadcast_callback(self, callback): + """注册 Beta PSD 广播回调函数""" + self._beta_thread.beta_broadcast_callback = callback + def _init_filters(self): """预计算所有滤波器系数(仅执行一次)""" # 50Hz工频陷波(Q=30,工业标准) self.b_notch, self.a_notch = signal.iirnotch(50, 30, self.srate) - # 8~30Hz带通FIR(65阶,线性相位) + # 0.5~45Hz带通FIR(65阶,线性相位) self.b_bp = signal.firwin( numtaps=65, cutoff=[0.5/(self.srate/2), 45/(self.srate/2)], @@ -193,16 +269,8 @@ class SlidingFilter(threading.Thread): self._beta_step_counter += 1 if self._beta_step_counter >= self._beta_steps_per_second: self._beta_step_counter = 0 - try: - # 直接使用已滤波的完整3s数据的前两通道(Fp1/Fp2) - filter_betadata = filtered_full[:2, :] # shape (2, 750) - beta_psd, _, _ = self._beta_calc.calculate_all( - filter_betadata, fs=self.srate, nperseg=min(self.window_size, filter_betadata.shape[1]) - ) - if self.beta_broadcast_callback is not None: - self.beta_broadcast_callback(round(float(beta_psd), 3)) - except Exception as be: - algo_log(f"beta_psd计算异常: {be}", level='error') + # 仅推送数据到队列,不阻塞等待计算完成 + self._beta_thread.push_data(filtered_full[:2, :].copy()) if self.filter_result_callback is not None: self.filter_result_callback(filtered_data[:64, :]) @@ -214,17 +282,11 @@ class SlidingFilter(threading.Thread): self.filter_result_callback = callback def stop(self): - """停止滤波线程(安全版)""" - # 1. 先设置停止标志(Event.clear()是线程安全的) + """停止滤波线程和 Beta 计算线程""" + self._beta_thread.stop() self.running.clear() - - # 2. 核心修复:只有线程已启动且正在运行时才调用join if self.is_alive(): - # 等待线程正常退出,最多1秒 self.join(timeout=1) - # 超时未退出时打印警告,便于排查问题 if self.is_alive(): algo_log("警告:滤波线程在1秒内未正常退出,可能存在阻塞操作", level="WARNING") - - # 3. 无论线程是否启动,都打印停止日志 algo_log("滤波线程已停止")