Files
bci_algo/Device/SunnyLinker.py

815 lines
32 KiB
Python
Raw Permalink Normal View History

2026-06-05 09:34:29 +08:00
# -*-coding:utf-8 -*-
'''
SunnyLinker的通讯驱动
'''
import ast
import socket
import threading
import time
import datetime
from typing import Dict
from collections import deque
import numpy as np
from threading import Thread, Event
import serial
from scipy import signal
from serial.serialutil import SerialException
from Device.protocol import ProtocolFrame
from PubLibrary.InifileHelper import IniRead
class RingBuffer:
def __init__(self, n_chan, n_points):
self.n_chan = n_chan
self.n_points = n_points
self.buffer = np.zeros((n_chan, n_points))
self.currentPtr = 0
self.readPtr = 0
self.nUpdate = 0
self.rawData = np.zeros((n_chan, 1))
## append buffer and update current pointer
def appendBuffer(self, data):
if self.nUpdate == self.n_points:
raise Exception("Buffer is full")
n = data.shape[1]
# 计算可以写入的元素数量
write_count = min(self.n_points - self.nUpdate, n)
# 写入新数据
self.buffer[:, np.mod(np.arange(self.currentPtr, self.currentPtr + write_count), self.n_points)] = data[:,:write_count]
# 更新结束指针
self.currentPtr = (self.currentPtr + write_count) % self.n_points
# 更新大小
self.nUpdate += write_count
## get data from buffer
def getData(self, count=50):
# 确保不会尝试读取超过缓冲区当前大小的数据
count = min(count, self.nUpdate)
# 计算读取结束后的下一个位置
next_read_ptr = (self.readPtr + count) % self.n_points
if self.readPtr + count <= self.n_points:
# 情况 1不环绕数据是连续的
end_index = next_read_ptr if next_read_ptr != 0 else self.n_points
data = self.buffer[:, self.readPtr:end_index]
else:
# 情况 2发生环绕数据被分成两部分
# 第一部分:从 readPtr 到缓冲区末尾
part1 = self.buffer[:, self.readPtr:]
# 第二部分:从缓冲区开头到 (count - part1.shape[1]) 个点
part2 = self.buffer[:, :next_read_ptr]
# 将两部分在列方向上拼接
data = np.concatenate((part1, part2), axis=1)
# 更新读指针
self.readPtr = next_read_ptr
# 更新大小
self.nUpdate -= count
return data
# reset buffer
def resetAllPara(self):
self.nUpdate = 0
self.currentPtr = 0
self.readPtr = 0 # add by lizhenhua 清空读指针
self.buffer = np.zeros((self.n_chan, self.n_points)) # add by lizhenhua 清空环形缓冲区
class SunnyLinker64(Thread, ):
serial_port = str(IniRead('system', 'Serial_port'))
t_buffer = 10
n_chan = 64
srate = 250
win_len = 10
win_step = 1
ring_buffer = 5
receiveData = b''
toUv=True#转为uV
RingBufferLock = threading.Lock()
# 单例模式
_instance = None
_initialized = False # 检查是否已经初始化
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(SunnyLinker64, cls).__new__(cls)
return cls._instance
def __init__(self, host='127.0.0.1', port=7878, srate=250, n_chan=64,method = 'tcp'):
if SunnyLinker64._initialized:
return
Thread.__init__(self)
self.daemon = True
self.host = host
self.port = port
self.srate = srate
self.n_chan = n_chan
self.method = method #传输方式,'tcp'表示tcp传输,'serial'表示串口传输
self.__ringBuffer = RingBuffer(self.n_chan + 2,
int(np.round(self.t_buffer * self.srate)))
self.energy = 0 # 电量
self.status_code = 0 # 与采集设备通信的状态码0为异常1为正常
self.gain_value = 6 # 增益倍数
self.interval_inited = False #ssmvep或mi时间窗是否初始化
# 设置初始化标志为True防止重复初始化
SunnyLinker64._initialized = True
# --- 新增:用于心跳检测 ---
self.last_called = 0 # 初始化为0
self.last_called_lock = threading.Lock() # 保护 last_called 的访问
def reset_state(self):
"""清空采集器状态和缓存数据"""
with self.RingBufferLock:
self.__ringBuffer.resetAllPara()
self.count_events = {}
self.epoch_finished = False
self.pack_contain_event = False
self.event_inner_idx = -1
self.interval_inited = False
def interval_init(self,decoder_class):
if decoder_class == 'ssmvep':
interval_epoch = ast.literal_eval(IniRead('system', 'SSMVEP_IntervalEpoch'))
self.interval_epoch = [int(i * self.srate) for i in interval_epoch] # epoch截取信息
self.train_epoch = [int(self.interval_epoch[0]),
int(self.interval_epoch[1] + 0.1 * self.srate)] # 训练样本epoch
self.latency = (self.interval_epoch[
1] + 0.1 * self.srate) // 5 # 提取epoch的延迟标记5代表每次解包得到的5位采样点;0.1表示比实际需要的长度多取0.1,会被截掉
self.train_latency = (self.train_epoch[1] + 0.1 * self.srate) // 5
elif decoder_class == 'mi':
interval_epoch = ast.literal_eval(IniRead('system', 'MI_IntervalEpoch'))
self.interval_epoch = [int(i * self.srate) for i in interval_epoch] # epoch截取信息
self.train_epoch = self.interval_epoch.copy()
self.latency = (self.interval_epoch[1]) // 5 # 提取epoch的延迟标记5代表每次解包得到的5位采样点;
self.train_latency = self.latency
print('时间窗:', (interval_epoch))
self.count_events: Dict[str, int] = {} # 表示包延迟的计数信息
self.event_inner_idx = -1 # event在5位数据包内部的idx
self.epoch_finished = False # 接收epoch是否完整
self.pack_contain_event = False # 当前包是否含有event
self.predict_event = 99
self.events = [1, 2, self.predict_event]
if getattr(self, 'serial', None) and self.serial.is_open:
self.serial.close()
self.serial = serial.Serial(self.serial_port, 460800, timeout=1) # 连接同步器串口
self.interval_inited = True
def set_sampleRate(self,sampleRate_Code=0x00):
'''
设置采样率
:param sampleRate_Code: 0x00:250Hz,0x01:500Hz,0x02:1000Hz,0x03:2000Hz
'''
function_code = 0x02
gain_code = 0x06
sampleRate_Code = [gain_code,sampleRate_Code]
packed_data = ProtocolFrame.pack(function_code, sampleRate_Code)
if self.method == 'tcp':
self.sock.send(packed_data)
def push_trigger(self,label):
'''
数据打标
@param label:标签类别
'''
function_code = None
label = [label]
packed_data = ProtocolFrame.pack(function_code, label)
if self.method == 'tcp' and hasattr(self,'serial'):
print('发送:', label, datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3])
self.serial.write(packed_data)
def Impedance(self, On):
'''
阻抗检测开关
:param On:True为开启False为关闭
:return: 组好的协议帧
'''
function_code = 0x01
if On:
data = [0x1]
self.gain_value = 6
else:
data = [0x0]
self.gain_value = 6
packed_data = ProtocolFrame.pack(function_code, data)
if self.method == 'tcp':
self.sock.send(packed_data)
def connect(self):
try:
if self.method == 'serial':
# 开启com口波特率115200超时5
self.sock = serial.Serial(self.host, self.port, timeout=5)
self.sock.flushInput() # 清空缓冲区
count = self.sock.inWaiting() # 获取串口缓冲区数据
while not count:
count = self.sock.inWaiting() # 获取串口缓冲区数据
# # 接收和存储数据
data = (self.sock.read(count))
self.receiveData = self.receiveData + data # 将接收数据存储在字符串中
elif self.method == 'tcp':
# 重连前关闭旧 socket避免资源泄漏
if hasattr(self, 'sock') and self.sock:
try:
self.sock.close()
except Exception:
pass
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, int(self.port)))
self.set_sampleRate(0x00) #设置250Hz采样率
return True
except Exception as e:
print("请打开头环")
print(e)
return False
print("connected")
return True
def extract_packet(self, packet):
# 存储一个点的八通道数据
dataList = []
# 存储116个点的八通道数据
dataMatrix = []
for j in range(5):
for i in range(self.n_chan):
if not self.toUv:#原始数据直接输出
val = (packet[194 * j + 25 + i * 3] << 16) | (packet[194 * j + 25 + 1 + i * 3] << 8) | packet[
194 * j + 25 + 2 + i * 3]
else:#转为uV
val = (packet[194 * j + 25 + i * 3] << 16) | (packet[194 * j + 25 + 1 + i * 3] << 8) | packet[
194 * j + 25 + 2 + i * 3]
if val < 8388608:
val = val * 4.5 / self.gain_value / 8388608 * 1000000;
else:
val = (val - 16777216) * 4.5 / self.gain_value / 8388608 * 1000000;
dataList.append(val)
#同步触发源
val = packet[194 * j + 25 + (i+1) * 3]
dataList.append(val)
#同步触发序号
val = packet[194 * j + 25 + (i+1) * 3+1]
dataList.append(val)
# 将数据矩阵进行拼接
if len(dataMatrix) == 0:
dataMatrix = np.asmatrix(dataList)
else:
dataMatrix = np.concatenate((dataMatrix, np.asmatrix(dataList)), axis=0)
dataList.clear()
return np.transpose(dataMatrix)
def run(self):
self.running = True
self.PackageLength = 998
# 尝试连接循环,断开后自动重连
while self.running:
if self.connect():
break
print(f"无法连接到 {self.host}:{self.port}15秒后重试...")
time.sleep(15)
# 启动心跳检测线程
threading.Thread(target=self.heartbeat_checker, daemon=True).start()
while self.running:
try:
if self.method == 'serial':
count = self.sock.inWaiting() # 获取串口缓冲区数据
if count:
# 接收和存储数据
data = (self.sock.read(count))
self.receiveData = self.receiveData + data # 将接收数据存储在字符串中
elif self.method == 'tcp':
data = self.sock.recv(600)
if not data:
break
self.receiveData += data
with self.last_called_lock:
self.last_called = time.time()
self.status_code = 1 # 收到数据,标记为正常
if len(self.receiveData) >= self.PackageLength and self.receiveData.rfind(
b'\x55\x55') >= self.PackageLength - 2:
index = self.receiveData.index(b'\xaa')
self.receiveData = self.receiveData[index:]
if len(self.receiveData) >= self.PackageLength:
onepackage = self.receiveData[:self.PackageLength]
if onepackage[7] != 0:
self.energy = onepackage[7] # 电量
self.receiveData = self.receiveData[self.PackageLength:]
dataMatrix = self.extract_packet(onepackage)
try:
with self.RingBufferLock:
if self.interval_inited:
self.epoch_finished = self.detect_event(dataMatrix)
if self.pack_contain_event:
self.__ringBuffer.resetAllPara() # 检测到当前pack含有event清除ringbuffer中之前的数据
self.__ringBuffer.appendBuffer(dataMatrix)
# self.plotBuffer.appendBuffer(dataMatrix)
if self.epoch_finished:
time.sleep(0.005)
print('epoch_finished: ', datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3])
else:
self.__ringBuffer.appendBuffer(dataMatrix)
except Exception as e:
print("锁:写入异常",e)
# self.RingBufferLock.release()
except ConnectionResetError:
self.status_code = 0 # 状态异常
print("Connection was reset by the peer. 正在重新连接...")
self.sock.close()
# 退出循环后run() 开头的重连循环会自动接管
break
# 如果 running=True重连循环会接管不会执行到这里
# 检测是否含有标签
def detect_event(self, samples):
self.pack_contain_event = False
events = np.array(samples[-2])[0].tolist()
for idx, event in enumerate(events):
if int(event) in self.events:
new_key = "".join(
[
str(event),
datetime.datetime.now().strftime("%Y-%m-%d \
-%H-%M-%S"),
]
)
if event == self.predict_event:
self.count_events[new_key] = self.latency + 1
else:
self.count_events[new_key] = self.train_latency + 1
self.event_inner_idx = idx
self.pack_contain_event = True
drop_items = []
for key, value in self.count_events.items():
value = value - 1
if value == 0:
drop_items.append(key)
self.count_events[key] = value
for key in drop_items:
del self.count_events[key]
if drop_items:
return True
return False
# --- 新增:心跳检测线程 ---
def heartbeat_checker(self):
"""
定期检查是否在最近2秒内收到 eegData
如果超过2秒未收到则设置 status_code = 0
"""
while self.running:
time.sleep(0.5) # 每0.5秒检查一次
with self.last_called_lock:
now = time.time()
# 只有收到过一次数据后才开始判断超时
if self.last_called > 0 and (now - self.last_called) > 30:
if self.status_code != 0:
print("EEG data timeout: disconnected")
self.status_code = 0
def getDataViaSSVEP(self,count):
'''
ssvep的视觉通道共8个通道
@param count: 每通道读取的数值数量
@return: 返回最新的数值
'''
data=self.getData(count)
# PO5,POZ,PO6,O2,PO8,OZ,O1,PO7 64是event导联
rows_to_extract = [13, 3, 2, 46, 9, 54, 47, 55,64]
row_to_select=np.array(rows_to_extract)
data=data[row_to_select,:]
return data
def get_MIData(self):
'''
取出当前所有数值
:return:
'''
data = self.getData(self.__ringBuffer.nUpdate)
#MI选取导联FC3,FC1,FCZ,FC2,FC4,C5,C3,C1,CZ,C2,C4,C6,CP3,CP1,CP2,CP4,P3,P1,PZ,P2,P4,event1,event2
rows_to_extract = [8, 15, 12, 14, 18, 23, 16,59,50,58,17,45,29,11,10,19,20,61,51,60,21,64,65]
row_to_select = np.array(rows_to_extract)
data = data[row_to_select,:]
return data
def get_SSMVEPData(self):
'''
取出当前所有数值
:return:
'''
data = self.getData(self.__ringBuffer.nUpdate)
# PO5,POZ,PO6,O2,PO8,OZ,O1,PO7 64是event导联
rows_to_extract = [13, 3, 2, 46, 9, 54, 47, 55, 64,65]
row_to_select = np.array(rows_to_extract)
data = data[row_to_select, :]
return data
def get_concentrateData(self,count):
'''
@param count: 每通道读取的数值数量
@return: 返回最新的数值
'''
data=self.getData(count)
rows_to_extract = [0, 1]
row_to_select = np.array(rows_to_extract)
data = data[row_to_select, :]
return data
def get_blinkData(self,count):
'''
@param count: 每通道读取的数值数量
@return: 返回最新的数值
'''
data=self.getData(count)
rows_to_extract = [0,1]
row_to_select = np.array(rows_to_extract)
data = data[row_to_select, :]
return data
def getImpedance(self, data,decoder_class):
'''
获取阻抗值已经放大100倍单位是kΩ
@param data: 准备计算的通道数据每通道200个值注意不要把信号打标的通道传进来
@return: 返回各个通道的阻抗值
'''
impedanceList = []
for channelindex in range(data.shape[0]):
if len(data[channelindex]) > 0:
data_list = []
# 设计陷波滤波器去除50Hz成分
is50filter = True
if is50filter:
b, a = signal.iirnotch(50, 30, self.srate) # 30是带宽1000是采样频率
data_list = signal.lfilter(b, a, data[channelindex].tolist())
else:
data_list.extend(data[channelindex].tolist())
data_list = data_list[-1000:]
# 执行FFT
fft_result = np.fft.fft(data_list)
fft_magnitude = np.abs(fft_result / len(data_list)) # 归一化FFT结果
freqs = np.fft.fftfreq(len(data_list), d=1 / self.srate) # 频率轴
# y_amp_modified = np.concatenate(([fft_magnitude[0] / len(t[0].tolist())],
# fft_magnitude[1:-1] * 2 / len(t[0].tolist()),
# [fft_magnitude[-1] / len(t[0].tolist())]))
# 找到幅值最大的频率成分的索引忽略直流分量即索引0
max_index = np.argmax(fft_magnitude[1:])
# 获取最大幅值的频率索引加上1因为索引0是直流分量
freq_index = max_index + 1
# 获取最大幅值
max_magnitude = fft_magnitude[freq_index]
# 阻抗
import math
result = math.sqrt(2) * math.pi * max_magnitude / 6 / 4
result *= 0.44 * 100 # 统一放大100倍
impedanceList.append(int(result))
# print(max_magnitude, result)
else:
impedanceList.append(0)
impedances = np.array(impedanceList)
if decoder_class in ('mi', 'ma'):
impedances = impedances[np.array([8, 15, 12, 14, 18, 23, 16, 59, 50, 58, 17, 45, 29, 11, 10, 19, 20, 61, 51, 60, 21])]
elif decoder_class == 'blink':
impedances = impedances[np.array([0, 1])]
elif decoder_class == 'concentration':
impedances = impedances[np.array([0, 1])]
else:
impedances = impedances[np.array([13, 3, 2, 46, 9, 54, 47, 55])]
return impedances
def getData(self,count):
'''
获取最新的数据
@param count: 每通道返回的最数值数目
@return: 所有通道的最新count个数值
'''
data=None
try:
with self.RingBufferLock:
data = self.__ringBuffer.getData(count)
except:
print("锁:读取异常")
# self.RingBufferLock.release()
return data
def GetDataLenCount(self):
'''
获取最新缓存中每个通道的数量
@return:
'''
return self.__ringBuffer.nUpdate
def ResetAll(self):
'''
清空缓存
@return:
'''
with self.RingBufferLock:
self.__ringBuffer.resetAllPara()
def stop(self):
self.running = False
class SunnyLinker8(Thread, ):
receiveData = ''
t_buffer = 10
n_chan = 9
srate = 1000
receiveData = b''
toUv=False#转为uV
RingBufferLock = threading.Lock()
def __init__(self, host, port, srate=1000, n_chan=9,method = 'tcp'):
Thread.__init__(self)
self.daemon = True
self.host = host
self.port = port
self.srate = srate
self.n_chan = n_chan
self.method = method #传输方式,'tcp'表示tcp传输,'serial'表示串口传输
self.__ringBuffer = RingBuffer(self.n_chan + 2,
int(np.round(self.t_buffer * self.srate)))
self.energy = 0 #电量
self.status_code = 0 #与采集设备通信的状态码0为异常1为正常
self.gain_value = 6 # 增益倍数
def push_trigger(self,label):
'''
数据打标
@param label:标签类别
'''
function_code = None
label = [label]
packed_data = ProtocolFrame.pack(function_code, label)
if self.method == 'tcp':
self.sock.send(packed_data)
elif self.method == 'serial':
self.sock.write(packed_data)
def Impedance(self, On):
'''
阻抗检测开关
:param On:True为开启False为关闭
:return: 组好的协议帧
'''
function_code = None
if On:
data = [0xA1]
self.gain_value = 24
else:
data = [0xA0]
self.gain_value = 6
packed_data = ProtocolFrame.pack(function_code, data)
if self.method == 'tcp':
self.sock.send(packed_data)
elif self.method == 'serial':
self.sock.write(packed_data)
def connect(self):
try:
if self.method == 'serial':
# 开启com口波特率115200超时5
self.sock = serial.Serial(self.host, self.port, timeout=5)
self.sock.flushInput() # 清空缓冲区
count = self.sock.inWaiting() # 获取串口缓冲区数据
while not count:
count = self.sock.inWaiting() # 获取串口缓冲区数据
# # 接收和存储数据
data = (self.sock.read(count))
self.receiveData = self.receiveData + data # 将接收数据存储在字符串中
print("connected")
elif self.method == 'tcp':
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, int(self.port)))
print("connected")
except Exception as e:
print("请打开头环")
print(e)
print("connected")
def extract_packet(self, packet):
# 存储一个点的八通道数据
dataList = []
# 存储116个点的八通道数据
dataMatrix = []
# index = (packet[1] << 24) | (packet[2] << 16) | (packet[3] << 8) | packet[4]
# print(index)
for j in range(5):
for i in range(self.n_chan):
if not self.toUv:#原始数据直接输出
val = (packet[26 * j + 25 + i * 3] << 16) | (packet[26 * j + 25 + 1 + i * 3] << 8) | packet[
26 * j + 25 + 2 + i * 3]
else:#转为uV
val = (packet[26 * j + 25 + i * 3] << 16) | (packet[26 * j + 25 + 1 + i * 3] << 8) | packet[
26 * j + 25 + 2 + i * 3]
if val < 8388608:
val = val * 4.5 / self.gain_value / 8388608 * 1000000;
else:
val = (val - 16777216) * 4.5 / self.gain_value / 8388608 * 1000000;
dataList.append(val)
#同步触发源
val = packet[26 * j + 25 + (i+1) * 3]
dataList.append(val)
#同步触发序号
val = packet[26 * j + 25 + (i+1) * 3+1]
dataList.append(val)
# 将数据矩阵进行拼接
if len(dataMatrix) == 0:
dataMatrix = np.asmatrix(dataList)
else:
dataMatrix = np.concatenate((dataMatrix, np.asmatrix(dataList)), axis=0)
dataList.clear()
return np.transpose(dataMatrix)
def run(self):
self.connect()
self.running = True
self.PackageLength = 158
start_time = time.time()
try:
while self.running:
if self.method == 'serial':
end_time = time.time()
if end_time-start_time > 2: #超过2s未收到数据
self.status_code = 0 #状态异常
count = self.sock.inWaiting() # 获取串口缓冲区数据
if count:
start_time = time.time()
self.status_code = 1 # 收到数据,状态正常
# 接收和存储数据
data = (self.sock.read(count))
self.receiveData = self.receiveData + data # 将接收数据存储在字符串中
elif self.method == 'tcp':
data = self.sock.recv(100)
if not data:
break
self.receiveData += data
if len(self.receiveData) >= self.PackageLength and self.receiveData.rfind(
b'\x55\x55') >= self.PackageLength - 2:
index = self.receiveData.index(b'\xaa')
self.receiveData = self.receiveData[index:]
if len(self.receiveData) >= self.PackageLength:
onepackage = self.receiveData[:self.PackageLength]
if onepackage[7] != 0:
self.energy = onepackage[7] # 电量
self.receiveData = self.receiveData[self.PackageLength:]
dataMatrix = self.extract_packet(onepackage)
try:
with self.RingBufferLock:
self.__ringBuffer.appendBuffer(dataMatrix)
except:
print("锁:写入异常")
self.sock.close()
except ConnectionResetError:
self.status_code = 0 # 状态异常
print("Connection was reset by the peer.")
except SerialException as Se:
self.status_code = 0
print('串口通信异常!请检查适配器')
def process_packet(self):
if self.circular_buffer.buffer_length > 158:
packet = self.circular_buffer.extract_packet()
if packet:
# Here you would parse the packet according to the protocol
# print("Received packet:%s,index:%s", len(packet),str(integer_value))
return packet
else:
print("Received Nothing")
return None
def getDataViaSSVEP(self,count):
'''
ssvep的视觉通道共8个通道
@param count: 每通道读取的数值数量
@return: 返回最新的数值
'''
data=self.getData(count)
data=data[:8,:]
return data
def getImpedance(self, data):
'''
获取阻抗值已经放大100倍单位是kΩ
@param data: 准备计算的通道数据每通道200个值注意不要把信号打标的通道传进来
@return: 返回各个通道的阻抗值
'''
impedanceList = []
for channelindex in range(data.shape[0]):
if len(data[channelindex]) > 0:
data_list = []
# 设计陷波滤波器去除50Hz成分
is50filter = True
if is50filter:
b, a = signal.iirnotch(50, 30, self.srate) # 30是带宽1000是采样频率
data_list = signal.lfilter(b, a, data[channelindex].tolist())
else:
data_list.extend(data[channelindex].tolist())
data_list = data_list[-1000:]
# 执行FFT
fft_result = np.fft.fft(data_list)
fft_magnitude = np.abs(fft_result / len(data_list)) # 归一化FFT结果
freqs = np.fft.fftfreq(len(data_list), d=1 / self.srate) # 频率轴
# y_amp_modified = np.concatenate(([fft_magnitude[0] / len(t[0].tolist())],
# fft_magnitude[1:-1] * 2 / len(t[0].tolist()),
# [fft_magnitude[-1] / len(t[0].tolist())]))
# 找到幅值最大的频率成分的索引忽略直流分量即索引0
max_index = np.argmax(fft_magnitude[1:])
# 获取最大幅值的频率索引加上1因为索引0是直流分量
freq_index = max_index + 1
# 获取最大幅值
max_magnitude = fft_magnitude[freq_index]
# 阻抗
import math
result = math.sqrt(2) * math.pi * max_magnitude / 6 / 4
result *= 0.44 * 100 # 统一放大100倍
impedanceList.append(int(result))
# print(max_magnitude, result)
else:
impedanceList.append(0)
# impedances = ":".join(map(str, impedanceList))
impedances = np.array(impedanceList)
impedances = impedances[:8]
return impedances
def getData(self,count):
'''
获取最新的数据
@param count: 每通道返回的最数值数目
@return: 所有通道的最新count个数值
'''
data=None
try:
with self.RingBufferLock:
data = self.__ringBuffer.getData(count)
except:
print("锁:读取异常")
# self.RingBufferLock.release()
return data
def GetDataLenCount(self):
'''
获取最新缓存中每个通道的数量
@return:
'''
return self.__ringBuffer.nUpdate
def ResetAll(self):
'''
清空缓存
@return:
'''
with self.RingBufferLock:
self.__ringBuffer.resetAllPara()
def stop(self):
self.running = False
if __name__ == "__main__":
# Usage
Linker = SunnyLinker64('127.0.0.1', 5086, 1000, 65)
Linker.start()
try:
while True:
time.sleep(0.005)
if(Linker.count()>0):
# print(Linker.ringBuffer.nUpdate)
t = Linker.getData()
print(t.shape[1], Linker.count())
# Linker.ringBuffer.nUpdate=0
# time.sleep(0.2)
except KeyboardInterrupt:
Linker.stop()