From fdddc814c7fc532a97c3cd5091971357234578d1 Mon Sep 17 00:00:00 2001 From: lizhao Date: Mon, 8 Jun 2026 17:13:25 +0800 Subject: [PATCH] fitler buffer with lock --- Zmq/filterProcess.py | 79 ++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/Zmq/filterProcess.py b/Zmq/filterProcess.py index 5d25eb8..1486fa1 100644 --- a/Zmq/filterProcess.py +++ b/Zmq/filterProcess.py @@ -17,34 +17,39 @@ class FilterRingBuffer: """ self.n_chan = n_chan self.n_points = n_points - self.buffer = np.zeros((n_chan, n_points), dtype=np.float64) - self.current_ptr = 0 # 写入指针:指向下一个要写入的位置 - self.total_samples = 0 # 已写入总点数 - self.lock = threading.Lock() # 线程安全锁 + self.current_ptr = 0 + self.total_samples = 0 + self.lock = threading.Lock() # 仅保护元数据 def appendBuffer(self, data): """ 追加数据到缓存(与paradigmRingBuffer接口一致) :param data: 输入数据,shape=(n_chan, n_samples) """ + n = data.shape[1] + if n == 0: + return + + # -------- 第一步:仅加锁读取/更新元数据(持锁极短)-------- with self.lock: - n = data.shape[1] - if n == 0: - return - - # 环形写入逻辑:指针到末尾则绕回 - write_end = self.current_ptr + n - if write_end <= self.n_points: - self.buffer[:, self.current_ptr:write_end] = data - else: - split = self.n_points - self.current_ptr - self.buffer[:, self.current_ptr:] = data[:, :split] - self.buffer[:, :write_end - self.n_points] = data[:, split:] - - # 更新指针(取模保证环形)和计数(不超过缓存总长度) - self.current_ptr = write_end % self.n_points - self.total_samples = min(self.total_samples + n, self.n_points) + old_ptr = self.current_ptr + new_ptr = (old_ptr + n) % self.n_points + new_total = min(self.total_samples + n, self.n_points) + + # -------- 第二步:数组写入(耗时操作,移出锁外)-------- + write_end = old_ptr + n + if write_end <= self.n_points: + self.buffer[:, old_ptr:write_end] = data + else: + split = self.n_points - old_ptr + self.buffer[:, old_ptr:] = data[:, :split] + self.buffer[:, :write_end - self.n_points] = data[:, split:] + + # -------- 第三步:再次加锁更新最终元数据 -------- + with self.lock: + self.current_ptr = new_ptr + self.total_samples = new_total def getData(self, count): """ @@ -53,39 +58,33 @@ class FilterRingBuffer: :param count: 读取点数 :return: np.ndarray, shape=(n_chan, count) """ - # with self.lock: - count = min(count, self.total_samples) - if count == 0: - return np.zeros((self.n_chan, 0)) + # -------- 第一步:加锁获取最新元数据(持锁极短)-------- + with self.lock: + count = min(count, self.total_samples) + if count == 0: + return np.zeros((self.n_chan, 0)) + end = self.current_ptr + start = end - count - # 环形读取:end是当前写入指针(最新数据的下一位),start是end - count - end = self.current_ptr - start = end - count if start >= 0: - return self.buffer[:, start:end].copy() + res = self.buffer[:, start:end].copy() else: - # 跨环形边界:前半部分从缓存末尾取,后半部分从开头取 - part1 = self.buffer[:, start:] # start为负,等价于n_points + start + part1 = self.buffer[:, start:] part2 = self.buffer[:, :end] - return np.concatenate((part1, part2), axis=1) + res = np.concatenate((part1, part2), axis=1).copy() + return res def get_latest_n_points(self, n): - """ - 扩展方法:获取最新的n个点(不移动读指针,用于滑动窗口) - :param n: 点数 - :return: np.ndarray, shape=(n_chan, n) | None(数据不足时) - """ - if self.total_samples < n: - return None + with self.lock: + if self.total_samples < n: + return None return self.getData(n) def GetDataLenCount(self): - """获取当前缓存总点数(兼容原有接口)""" with self.lock: return self.total_samples def resetAllPara(self): - """重置所有缓存和指针(兼容原有接口)""" with self.lock: self.buffer.fill(0.0) self.current_ptr = 0