Files
Depression_TMS/algorithm_V0/datacollect/eegParser_scipy_package.py
2026-06-01 13:18:36 +08:00

207 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sys
import threading
import time
import numpy as np
import pandas as pd
from SunnyLinker import SunnyLinker64
from zmqServer import zmqServer
from zmqClient import zmqClient
from scipy.io import savemat
class Parser_main(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.Running = True
self.fs = 250 # 采样率
self.energy = 0 # 电量
self.status_code = 0 # 与采集设备通信的状态码0为异常1为正常
self.n_chan = 64
self.dataBuffer = []
self.file_num = 0#保存文件序号
self.subject_id = None #受试者ID
self.session_id = None #Session ID
self.last_print_time = None
def connect(self):
self.thread_data_server = SunnyLinker64('127.0.0.1', 7878, 250, 64,
method='tcp')
self.thread_data_server.toUv = True
self.thread_data_server.start()
self.zmqServer = zmqServer()
self.zmqServer.start()
self.zmqClient = zmqClient('127.0.0.1', 8088)
self.zmqClient.connect()
def run(self):
while self.Running:
# 同步信息
if self.zmqServer.state_mode == 'sync':
self.zmqClient.send_to_all('sync', self.zmqClient.state)
self.zmqServer.state_mode = 'rest'
# 状态异常,报告上位机
if self.status_code != self.thread_data_server.status_code:
self.status_code = self.thread_data_server.status_code
self.zmqClient.send_to_all('status_code', int(self.status_code))
# 返回电量
if self.energy != self.thread_data_server.energy:
self.energy = self.thread_data_server.energy
self.zmqClient.send_to_all('energy', int(self.energy))
# 更新文件序号
if self.subject_id != self.zmqServer.subject_id or self.session_id != self.zmqServer.session_id:
self.subject_id = self.zmqServer.subject_id
self.session_id = self.zmqServer.session_id
self.file_num = 0 #从零开始计数
if self.zmqServer.open_Impedance == True: # 开启阻抗检测功能,仅运行一次
self.thread_data_server.Impedance(True)
self.zmqServer.open_Impedance = -1
elif self.zmqServer.open_Impedance == False:
self.thread_data_server.Impedance(False)
self.zmqServer.open_Impedance = -1
if self.zmqServer.get_Impedance: # 返回阻抗值
if self.thread_data_server.GetDataLenCount() > self.fs:
Impe_data = self.thread_data_server.getData(self.fs)
# 计算阻抗
imps = self.thread_data_server.getImpedance(Impe_data, self.n_chan)
self.zmqClient.send_to_all('impedance', imps.tolist())
else:
pass
if self.thread_data_server.GetDataLenCount() < 50:
time.sleep(0.01)
continue
if self.zmqServer.get_Impedance == False: # 非阻抗检测状态
data = self.thread_data_server.getData(50)
data = data[:self.n_chan, :]
if self.zmqServer.mat_generate:
# 检测是否需要重置缓冲区(第二次发送 matGenerate 时清空旧数据)
if self.zmqServer.reset_mat_buffer:
self.dataBuffer = []
self.last_print_time = None
self.zmqServer.reset_mat_buffer = False
print('[INFO] 数据缓冲区已重置,从头开始采集')
self.dataBuffer.append(data)
if len(self.dataBuffer) % 50 == 0:
current_time = time.time()
if self.last_print_time is not None:
elapsed_time = current_time - self.last_print_time
# 2500个点 = 50个数据块 * 50个采样点/数据块
actual_fs = 2500 / elapsed_time
print(f"接收 2500 个采样点耗时: {elapsed_time:.4f} 秒, 折合实际采样率: {actual_fs:.2f} Hz")
else:
print("开始计时...")
self.last_print_time = current_time
print('数据保存进度: {}/{}'.format(len(self.dataBuffer),int(self.zmqServer.save_win*self.fs//50)))
if len(self.dataBuffer) >= int(self.zmqServer.save_win*self.fs//50): #5分钟*60秒*250Hz / 50
self.zmqServer.mat_generate = False
matData = np.hstack(self.dataBuffer[:int(self.zmqServer.save_win*self.fs//50)])
self.dataBuffer = []
self.last_print_time = None # 重置计时器以备下次使用
self.pack2mat(matData,self.subject_id,self.session_id)
def pack2mat(self,data,subject_id,session_id):
#EEG数据
Data = data.T
#通道名称
channel_names = np.array(
['AIN1', 'AIN2', 'AIN3', 'AIN4', 'AIN5', 'AIN6', 'AIN7', 'AIN8', 'AIN9', 'AIN10', 'AIN11', 'AIN12',
'AIN13', 'AIN14', 'AIN15', 'AIN16', 'AIN17', 'AIN18', 'AIN19', 'AIN20', 'AIN21', 'AIN22', 'AIN23',
'AIN24', 'AIN25', 'AIN26', 'AIN27', 'AIN28', 'AIN29', 'AIN30', 'AIN31', 'AIN32', 'AIN33', 'AIN34',
'AIN35', 'AIN36', 'AIN37', 'AIN38', 'AIN39', 'AIN40', 'AIN41', 'AIN42', 'AIN43', 'AIN44', 'AIN45',
'AIN46', 'AIN47', 'AIN48', 'AIN49', 'AIN50', 'AIN51', 'AIN52', 'AIN53', 'AIN54', 'AIN55', 'AIN56',
'AIN57', 'AIN58', 'AIN59', 'AIN60', 'AIN61', 'AIN62', 'AIN63', 'AIN64'], dtype=object)
#采样率
sample_rate = self.fs
#通道数量
node_number = Data.shape[1]
# 时间轴
t = np.linspace(0, self.zmqServer.save_win, Data.shape[0])
t = t.reshape(len(t), 1)
#电极名称
electrode_name = np.array(['FP1', 'FP2', 'PO6', 'POZ', 'F3', 'F4', 'FPZ', 'AF4', 'FC3', 'PO8', 'CP2', 'CP1',
'FCZ', 'PO5', 'FC2', 'FC1', 'C3', 'C4', 'FC4', 'CP4', 'P3', 'P4', 'F5', 'C5', 'F6',
'PO4', 'CP6', 'CP5', 'PO3', 'CP3', 'FC6', 'FC5', 'CB1', 'CB2', 'P5', 'AF7', 'A1','T7',
'FT7', 'TP7', 'FT8', 'AF8', 'F8', 'F7', 'P6', 'C6', 'O2', 'O1', 'T8', 'P7', 'CZ','PZ',
'P8', 'FZ', 'OZ', 'PO7', 'TP8', 'AF3', 'C2', 'C1', 'P2', 'P1', 'F2', 'F1'],
dtype=object)
#电极三维坐标
electrode_xyz = self.read_ch_pos()
electrode_xyz.update({'A1': [-0.095, 0, -0.005]})
electrode_xyz = {key: electrode_xyz[key] for key in electrode_name}
electrode_xyz = np.array(list(electrode_xyz.values()))
#电极坐标所属的坐标系
electrode_coord_system = '10-20 spherical model'
#受试者ID
Subject_id = subject_id
#Session ID
Session_id = session_id
#参考电极方案
ref = 'CPZ'
#数据采集开始时间
start_time = 0
meta_struct = {
'subject_id': Subject_id,
'session_id': Session_id,
'ref': ref,
'start_time': start_time
}
eeg_struct = {
'data': Data,
'chn': channel_names,
'sample_rate': sample_rate,
'node_number': node_number,
't': t,
'electrode_name': electrode_name,
'electrode_xyz': electrode_xyz,
'electrode_coord_system': electrode_coord_system,
'meta': meta_struct,
}
fileDir = os.path.join('EEGfiles/',Subject_id,Session_id)
os.makedirs(fileDir,exist_ok=True)
filePath = os.path.join(fileDir,'eeg_data{}.mat'.format(self.file_num))
# 保存到 .mat 文件,顶层变量名为 'eeg'
savemat(filePath, {'eeg': eeg_struct})
print('EEGfile saved at {}'.format(filePath))
self.zmqClient.send_to_all('filePath', filePath)
self.file_num += 1
def read_ch_pos(self,file_path=r'xy_64.xlsx'):
"""
将电极位置信息转换为Dict
参数:
file_path: 电极位置存储文件, 必须包含'channel', 'x', 'y', 'z'
"""
if getattr(sys, 'frozen', False):
script_dir = sys._MEIPASS
else:
script_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(script_dir, file_path)
df = pd.read_excel(file_path)
# 确保列名正确
if not all(col in df.columns for col in ['channel', 'x', 'y', 'z']):
raise ValueError("DataFrame必须包含'channel', 'x', 'y', 'z'")
# 创建电极位置字典
ch_pos = {}
for _, row in df.iterrows():
ch_pos[row['channel']] = [row['x'], row['y'], row['z']]
return ch_pos
def stop(self):
'''
停止运行
@return:
'''
self.zmqServer.stop()
self.Running=False