74 lines
2.2 KiB
Python
74 lines
2.2 KiB
Python
|
|
import numpy as np
|
|||
|
|
from scipy.signal import welch
|
|||
|
|
from collections import deque
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Beta_Calculate():
|
|||
|
|
def __init__(self, Threshold_value_low, Threshold_value_high, fs=250, win_len=5, config=None):
|
|||
|
|
self.Threshold_value_low = Threshold_value_low
|
|||
|
|
self.Threshold_value_high = Threshold_value_high
|
|||
|
|
self.fs = fs
|
|||
|
|
self.beta_result = []
|
|||
|
|
self.eegQueue = deque(maxlen=win_len)
|
|||
|
|
|
|||
|
|
def calculate_all(self, data, fs, nperseg=1000):
|
|||
|
|
mean_x = np.mean(data, axis=-1, keepdims=True)
|
|||
|
|
data = data - mean_x
|
|||
|
|
freqs, psd = self.compute_psd_multichannel(data, fs, nperseg)
|
|||
|
|
beta_psd = np.sum(self.band_psd(freqs, psd, (13, 30)))
|
|||
|
|
alpha_psd = np.sum(self.band_psd(freqs, psd, (8, 13)))
|
|||
|
|
theta_psd = np.sum(self.band_psd(freqs, psd, (4, 8)))
|
|||
|
|
|
|||
|
|
print(f"[功率] β={beta_psd:.2f} | α={alpha_psd:.2f} | θ={theta_psd:.2f}")
|
|||
|
|
|
|||
|
|
return beta_psd, alpha_psd, theta_psd
|
|||
|
|
|
|||
|
|
def compute_psd_multichannel(self, data, fs=250, nperseg=1000):
|
|||
|
|
n_samples = data.shape[-1]
|
|||
|
|
if n_samples < nperseg:
|
|||
|
|
nperseg = n_samples
|
|||
|
|
|
|||
|
|
noverlap = 500
|
|||
|
|
if noverlap >= nperseg:
|
|||
|
|
noverlap = int(nperseg / 2)
|
|||
|
|
|
|||
|
|
if nperseg == 0:
|
|||
|
|
return np.array([]), np.zeros((data.shape[0], 0))
|
|||
|
|
|
|||
|
|
freqs, psd = welch(data, fs=fs, nperseg=nperseg, noverlap=noverlap, axis=-1)
|
|||
|
|
return freqs, psd
|
|||
|
|
|
|||
|
|
def band_psd(self, freqs, psd, band):
|
|||
|
|
idx = np.logical_and(freqs >= band[0], freqs <= band[1])
|
|||
|
|
return np.sum(psd[:, idx], axis=-1)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def reset_queue(self):
|
|||
|
|
self.eegQueue.clear()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def queueOpt(self, data):
|
|||
|
|
if data is None or data.size == 0:
|
|||
|
|
return None
|
|||
|
|
if len(self.eegQueue) < self.eegQueue.maxlen:
|
|||
|
|
self.eegQueue.append(data)
|
|||
|
|
else:
|
|||
|
|
self.eegQueue.append(data)
|
|||
|
|
|
|||
|
|
if len(self.eegQueue) == self.eegQueue.maxlen:
|
|||
|
|
eegData = np.hstack([self.eegQueue[i] for i in range(len(self.eegQueue))])
|
|||
|
|
if eegData.size == 0:
|
|||
|
|
return None
|
|||
|
|
eegData -= np.mean(eegData, axis=-1, keepdims=True)
|
|||
|
|
|
|||
|
|
beta_psd, alpha_psd, theta_psd = self.calculate_all(eegData, fs=self.fs, nperseg=1000)
|
|||
|
|
|
|||
|
|
return (beta_psd)
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|