This commit is contained in:
2026-06-09 19:14:50 +08:00
parent 88f1d744f6
commit 3ac9fe8050
5 changed files with 339 additions and 114 deletions

View File

@@ -24,7 +24,7 @@ constexpr int k8ChLeadCount = 8;
constexpr int k64ChLeadCount = 64;
constexpr std::uint8_t kAlgorithmChannelCount = 64;
constexpr std::size_t kAlgorithmSampleByteCount =
static_cast<std::size_t>(XYPARSER_FRAME_DATA_COLUMN_COUNT) * sizeof(double);
static_cast<std::size_t>(XYPARSER_MAX_CHANNELS) * sizeof(double);
struct AlgorithmSample {
std::array<double, XYPARSER_MAX_CHANNELS> channel_values_uv{};
@@ -582,13 +582,13 @@ int XYParser_FeedAlgorithmData(XYParserHandle handle,
while (context->algorithm_byte_cache.size() >= kAlgorithmSampleByteCount) {
AlgorithmSample sample{};
std::array<double, XYPARSER_FRAME_DATA_COLUMN_COUNT> row{};
std::array<double, XYPARSER_MAX_CHANNELS> row{};
std::memcpy(row.data(), context->algorithm_byte_cache.data(), kAlgorithmSampleByteCount);
for (std::size_t channel_index = 0; channel_index < XYPARSER_MAX_CHANNELS; ++channel_index) {
sample.channel_values_uv[channel_index] = row[channel_index];
}
sample.trigger_type = static_cast<std::uint8_t>(row[XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX]);
sample.trigger_index = static_cast<std::uint8_t>(row[XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX]);
sample.trigger_type = 0;
sample.trigger_index = 0;
context->welch_processor.PushSample(sample.channel_values_uv);
context->algorithm_sample_cache.push_back(sample);
context->algorithm_byte_cache.erase(

View File

@@ -296,6 +296,7 @@ XYPARSER_API int XYParser_ConvertSampleFramesToAlgorithmData(const XYParserFrame
double* output_data);
// 向算法数据缓存输入字节流,并按每 5 个采样组装为帧,同时驱动 Welch 计算。
// 算法回包格式为每个采样点 64 个 double仅包含 64 路通道值,不包含 trigger_type/trigger_index。
// @param handle 目标解析器句柄。
// @param data 输入的算法数据字节流。
// @param size 输入数据的字节数。

View File

@@ -15,6 +15,8 @@
// 匿名命名空间,包含测试辅助代码
namespace {
constexpr int kAlgorithmReturnColumnCount = XYPARSER_MAX_CHANNELS;
/// ParserGuard 类RAII 封装,确保解析器资源自动释放
/// 当对象生命周期结束时自动调用 XYParser_DestroyParser 释放资源
class ParserGuard {
@@ -199,9 +201,9 @@ std::vector<double> BuildAlgorithmDataForSingleChannel(int sample_rate,
double secondary_amplitude = 0.0)
{
std::vector<double> algorithm_data(
static_cast<std::size_t>(sample_rate * XYPARSER_FRAME_DATA_COLUMN_COUNT), 0.0);
static_cast<std::size_t>(sample_rate * kAlgorithmReturnColumnCount), 0.0);
for (int sample = 0; sample < sample_rate; ++sample) {
const std::size_t sample_offset = static_cast<std::size_t>(sample) * XYPARSER_FRAME_DATA_COLUMN_COUNT;
const std::size_t sample_offset = static_cast<std::size_t>(sample) * kAlgorithmReturnColumnCount;
algorithm_data[sample_offset] = static_cast<double>(BuildCombinedSineRawValue(sample,
sample_rate,
primary_frequency_hz,
@@ -219,9 +221,9 @@ std::vector<double> BuildAlgorithmDataForTwoChannels(int sample_rate,
double channel1_amplitude)
{
std::vector<double> algorithm_data(
static_cast<std::size_t>(sample_rate * XYPARSER_FRAME_DATA_COLUMN_COUNT), 0.0);
static_cast<std::size_t>(sample_rate * kAlgorithmReturnColumnCount), 0.0);
for (int sample = 0; sample < sample_rate; ++sample) {
const std::size_t sample_offset = static_cast<std::size_t>(sample) * XYPARSER_FRAME_DATA_COLUMN_COUNT;
const std::size_t sample_offset = static_cast<std::size_t>(sample) * kAlgorithmReturnColumnCount;
algorithm_data[sample_offset] = static_cast<double>(
BuildSineRawValue(sample, sample_rate, channel0_frequency_hz, channel0_amplitude));
algorithm_data[sample_offset + 1] = static_cast<double>(
@@ -645,15 +647,13 @@ TEST(XYParserApiTests, FeedAlgorithmDataCachesSamplesBuildsFramesAndFlushesTailS
ASSERT_NE(parser.get(), nullptr);
constexpr int kTotalSamples = 7;
constexpr int kColumnCount = XYPARSER_FRAME_DATA_COLUMN_COUNT;
constexpr int kColumnCount = kAlgorithmReturnColumnCount;
std::array<double, static_cast<std::size_t>(kTotalSamples) * kColumnCount> input_data{};
for (int sample_index = 0; sample_index < kTotalSamples; ++sample_index) {
const std::size_t row_offset = static_cast<std::size_t>(sample_index) * kColumnCount;
input_data[row_offset + 0] = 1000.0 + sample_index;
input_data[row_offset + 10] = 2000.0 + sample_index;
input_data[row_offset + 63] = 3000.0 + sample_index;
input_data[row_offset + XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX] = 10.0 + sample_index;
input_data[row_offset + XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX] = 20.0 + sample_index;
}
const std::uint8_t* input_bytes = reinterpret_cast<const std::uint8_t*>(input_data.data());
const std::size_t first_chunk_size = 3 * static_cast<std::size_t>(kColumnCount) * sizeof(double) + sizeof(double);
@@ -681,8 +681,8 @@ TEST(XYParserApiTests, FeedAlgorithmDataCachesSamplesBuildsFramesAndFlushesTailS
EXPECT_DOUBLE_EQ(summaries[0].channel_values_uv[0][0], 1000.0);
EXPECT_DOUBLE_EQ(summaries[0].channel_values_uv[2][10], 2002.0);
EXPECT_DOUBLE_EQ(summaries[0].channel_values_uv[4][63], 3004.0);
EXPECT_EQ(summaries[0].sample_trigger_types[0], 10U);
EXPECT_EQ(summaries[0].sample_trigger_indices[4], 24U);
EXPECT_EQ(summaries[0].sample_trigger_types[0], 0U);
EXPECT_EQ(summaries[0].sample_trigger_indices[4], 0U);
XYParserFrameSummary tail_summary{};
ASSERT_EQ(XYParser_FlushAlgorithmData(parser.get(), &tail_summary), 1);
@@ -691,8 +691,8 @@ TEST(XYParserApiTests, FeedAlgorithmDataCachesSamplesBuildsFramesAndFlushesTailS
EXPECT_EQ(tail_summary.sample_count, 2U);
EXPECT_DOUBLE_EQ(tail_summary.channel_values_uv[0][0], 1005.0);
EXPECT_DOUBLE_EQ(tail_summary.channel_values_uv[1][63], 3006.0);
EXPECT_EQ(tail_summary.sample_trigger_types[0], 15U);
EXPECT_EQ(tail_summary.sample_trigger_indices[1], 26U);
EXPECT_EQ(tail_summary.sample_trigger_types[0], 0U);
EXPECT_EQ(tail_summary.sample_trigger_indices[1], 0U);
EXPECT_DOUBLE_EQ(tail_summary.channel_values_uv[2][0], 0.0);
EXPECT_EQ(tail_summary.sample_trigger_types[2], 0U);
@@ -1108,9 +1108,9 @@ TEST(XYParserApiTests, WelchReturnsOneResultAfterOneSecondFromAlgorithmData)
XYParser_SetSampleRate(parser.get(), 10);
XYParser_SetWelchDetection(parser.get(), 1);
std::vector<double> algorithm_data(static_cast<std::size_t>(10 * XYPARSER_FRAME_DATA_COLUMN_COUNT), 0.0);
std::vector<double> algorithm_data(static_cast<std::size_t>(10 * kAlgorithmReturnColumnCount), 0.0);
for (int sample = 0; sample < 10; ++sample) {
const std::size_t sample_offset = static_cast<std::size_t>(sample) * XYPARSER_FRAME_DATA_COLUMN_COUNT;
const std::size_t sample_offset = static_cast<std::size_t>(sample) * kAlgorithmReturnColumnCount;
algorithm_data[sample_offset] = static_cast<double>(BuildSineRawValue(sample, 10, 2.0, 1000000.0));
}
@@ -1370,7 +1370,7 @@ TEST(XYParserApiTests, WelchResetClearsHalfWindowAfterDisableEnable)
const std::vector<double> full_data = BuildAlgorithmDataForSingleChannel(10, 2.0, 1000000.0);
const std::size_t half_row_count =
static_cast<std::size_t>(XYPARSER_SAMPLES_PER_FRAME) * XYPARSER_FRAME_DATA_COLUMN_COUNT;
static_cast<std::size_t>(XYPARSER_SAMPLES_PER_FRAME) * kAlgorithmReturnColumnCount;
const std::vector<double> first_half(full_data.begin(), full_data.begin() + static_cast<std::ptrdiff_t>(half_row_count));
const std::vector<double> second_half(full_data.begin() + static_cast<std::ptrdiff_t>(half_row_count), full_data.end());
@@ -1657,9 +1657,9 @@ TEST(XYParserApiTests, WelchDisabledDoesNotProduceResultsFromAlgorithmData)
XYParser_SetSampleRate(parser.get(), 10);
std::vector<double> algorithm_data(static_cast<std::size_t>(10 * XYPARSER_FRAME_DATA_COLUMN_COUNT), 0.0);
std::vector<double> algorithm_data(static_cast<std::size_t>(10 * kAlgorithmReturnColumnCount), 0.0);
for (int sample = 0; sample < 10; ++sample) {
const std::size_t sample_offset = static_cast<std::size_t>(sample) * XYPARSER_FRAME_DATA_COLUMN_COUNT;
const std::size_t sample_offset = static_cast<std::size_t>(sample) * kAlgorithmReturnColumnCount;
algorithm_data[sample_offset] = static_cast<double>(BuildSineRawValue(sample, 10, 2.0, 1000000.0));
}

View File

@@ -401,48 +401,51 @@ public:
ResponseMode mode,
std::vector<std::uint8_t>& response)
{
response.resize(request_size);
if (mode == ResponseMode::Echo) {
std::memcpy(response.data(), request, request_size);
return request_size;
}
if (mode == ResponseMode::Zero) {
std::fill(response.begin(), response.end(), 0);
return request_size;
}
if ((request_size % sizeof(double)) != 0) {
response.clear();
return 0;
}
const std::size_t value_count = request_size / sizeof(double);
if ((value_count % static_cast<std::size_t>(XYPARSER_FRAME_DATA_COLUMN_COUNT)) != 0) {
response.clear();
return 0;
}
const std::size_t sample_count =
value_count / static_cast<std::size_t>(XYPARSER_FRAME_DATA_COLUMN_COUNT);
const std::size_t response_value_count =
sample_count * static_cast<std::size_t>(XYPARSER_MAX_CHANNELS);
const std::size_t response_size = response_value_count * sizeof(double);
response.resize(response_size);
const auto* input_values = reinterpret_cast<const double*>(request);
auto* output_values = reinterpret_cast<double*>(response.data());
for (std::size_t sample_index = 0; sample_index < XYPARSER_SAMPLES_PER_FRAME; ++sample_index) {
const std::size_t sample_offset =
sample_index * static_cast<std::size_t>(XYPARSER_FRAME_DATA_COLUMN_COUNT);
for (std::size_t channel_index = 0; channel_index < XYPARSER_MAX_CHANNELS; ++channel_index) {
const std::size_t value_index = sample_offset + channel_index;
if (value_index >= value_count) {
response.clear();
return 0;
}
output_values[value_index] = channels_[channel_index].Process(input_values[value_index]);
}
if (mode == ResponseMode::Zero) {
std::fill(response.begin(), response.end(), 0);
return response_size;
}
const std::size_t trigger_type_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX;
const std::size_t trigger_index_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX;
if (trigger_index_index >= value_count) {
for (std::size_t sample_index = 0; sample_index < sample_count; ++sample_index) {
const std::size_t input_sample_offset =
sample_index * static_cast<std::size_t>(XYPARSER_FRAME_DATA_COLUMN_COUNT);
const std::size_t output_sample_offset =
sample_index * static_cast<std::size_t>(XYPARSER_MAX_CHANNELS);
for (std::size_t channel_index = 0; channel_index < XYPARSER_MAX_CHANNELS; ++channel_index) {
const std::size_t input_value_index = input_sample_offset + channel_index;
const std::size_t output_value_index = output_sample_offset + channel_index;
if (input_value_index >= value_count || output_value_index >= response_value_count) {
response.clear();
return 0;
}
output_values[trigger_type_index] = input_values[trigger_type_index];
output_values[trigger_index_index] = input_values[trigger_index_index];
output_values[output_value_index] =
(mode == ResponseMode::Echo)
? input_values[input_value_index]
: channels_[channel_index].Process(input_values[input_value_index]);
}
}
return request_size;
return response_size;
}
private:
@@ -633,14 +636,46 @@ public:
return -1;
}
std::vector<std::uint8_t> payload_frame;
if (!ReceiveFrame(payload_frame, has_more)) {
std::vector<std::uint8_t> second_frame;
if (!ReceiveFrame(second_frame, has_more)) {
timed_out = (last_error_text_ == "Resource temporarily unavailable");
return timed_out ? 0 : -1;
}
if (has_more) {
last_error_text_ = "unexpected multipart payload";
return -1;
std::vector<std::uint8_t> payload_frame;
if (!has_more) {
payload_frame = std::move(second_frame);
} else if (second_frame.empty()) {
if (!ReceiveFrame(payload_frame, has_more)) {
timed_out = (last_error_text_ == "Resource temporarily unavailable");
return timed_out ? 0 : -1;
}
if (has_more) {
last_error_text_ = "unexpected multipart payload";
return -1;
}
} else {
std::vector<std::uint8_t> third_frame;
if (!ReceiveFrame(third_frame, has_more)) {
timed_out = (last_error_text_ == "Resource temporarily unavailable");
return timed_out ? 0 : -1;
}
if (!has_more) {
payload_frame = std::move(third_frame);
} else {
if (!third_frame.empty()) {
last_error_text_ = "unexpected non-empty delimiter frame";
return -1;
}
if (!ReceiveFrame(payload_frame, has_more)) {
timed_out = (last_error_text_ == "Resource temporarily unavailable");
return timed_out ? 0 : -1;
}
if (has_more) {
last_error_text_ = "unexpected multipart payload";
return -1;
}
}
}
if (payload_frame.size() > static_cast<std::size_t>(capacity)) {
last_error_text_ = "payload too large";
@@ -670,6 +705,12 @@ public:
return false;
}
const int sent_empty = zmq_send(socket_, nullptr, 0, ZMQ_SNDMORE);
if (sent_empty != 0) {
last_error_text_ = zmq_strerror(zmq_errno());
return false;
}
const int sent_payload = zmq_send(socket_, data, size, 0);
if (sent_payload != size) {
last_error_text_ = zmq_strerror(zmq_errno());

View File

@@ -58,9 +58,9 @@ struct DemoOptions {
DemoMode mode = kDemoMode;
std::string tcp_host = kDefaultTcpHost;
int tcp_port = 5086;
std::string algorithm_host = "127.0.0.1";
std::string algorithm_host = "192.168.254.102";
int algorithm_port = 8100;
int algorithm_timeout_ms = 200;
int algorithm_timeout_ms = 1000;
std::wstring data_com_port = kDefaultDataComPort;
std::wstring trigger_com_port = kDefaultTriggerComPort;
int serial_baud_rate = 460800;
@@ -282,8 +282,33 @@ public:
return false;
}
const int sent = zmq_send(socket_, data, size, 0);
if (sent != static_cast<int>(size)) {
const Clock::time_point now = Clock::now();
std::cout << "[AlgorithmZmqTx] payloadBytes=" << size;
if (last_send_time_.has_value()) {
const double delta_ms =
std::chrono::duration<double, std::milli>(now - *last_send_time_).count();
std::cout << " deltaMs=" << delta_ms;
}
std::cout << std::endl;
last_send_time_ = now;
const int sent_identity = zmq_send(socket_,
identity_.data(),
static_cast<int>(identity_.size()),
ZMQ_SNDMORE);
if (sent_identity != static_cast<int>(identity_.size())) {
last_error_text_ = zmq_strerror(zmq_errno());
return false;
}
const int sent_empty = zmq_send(socket_, nullptr, 0, ZMQ_SNDMORE);
if (sent_empty != 0) {
last_error_text_ = zmq_strerror(zmq_errno());
return false;
}
const int sent_payload = zmq_send(socket_, data, size, 0);
if (sent_payload != static_cast<int>(size)) {
last_error_text_ = zmq_strerror(zmq_errno());
return false;
}
@@ -299,20 +324,94 @@ public:
return -1;
}
const int received = zmq_recv(socket_, buffer, capacity, 0);
if (received >= 0) {
last_error_text_.clear();
return received;
std::vector<std::uint8_t> first_frame;
bool has_more = false;
if (!ReceiveFrame(first_frame, has_more, 0)) {
const int error = zmq_errno();
last_error_text_ = zmq_strerror(error);
if (error == EAGAIN) {
timed_out = true;
return 0;
}
return -1;
}
const int error = zmq_errno();
last_error_text_ = zmq_strerror(error);
if (error == EAGAIN) {
timed_out = true;
return 0;
std::vector<std::uint8_t> payload_frame;
if (has_more) {
if (!first_frame.empty()) {
last_error_text_ = "unexpected non-empty delimiter frame";
return -1;
}
bool payload_has_more = false;
if (!ReceiveFrame(payload_frame, payload_has_more, 0)) {
const int error = zmq_errno();
last_error_text_ = zmq_strerror(error);
if (error == EAGAIN) {
timed_out = true;
return 0;
}
return -1;
}
if (payload_has_more) {
last_error_text_ = "unexpected multipart payload";
return -1;
}
} else {
payload_frame = std::move(first_frame);
}
return -1;
if (payload_frame.size() > static_cast<std::size_t>(capacity)) {
last_error_text_ = "payload too large";
return -1;
}
std::memcpy(buffer, payload_frame.data(), payload_frame.size());
last_error_text_.clear();
return static_cast<int>(payload_frame.size());
}
int ReceiveNonBlocking(std::vector<std::uint8_t>& payload, bool& no_message)
{
no_message = false;
if (socket_ == nullptr) {
last_error_text_ = "socket not open";
return -1;
}
std::vector<std::uint8_t> first_frame;
bool has_more = false;
if (!ReceiveFrame(first_frame, has_more, ZMQ_DONTWAIT)) {
const int error = zmq_errno();
last_error_text_ = zmq_strerror(error);
if (error == EAGAIN) {
no_message = true;
return 0;
}
return -1;
}
std::vector<std::uint8_t> payload_frame;
if (has_more) {
if (!first_frame.empty()) {
last_error_text_ = "unexpected non-empty delimiter frame";
return -1;
}
bool payload_has_more = false;
if (!ReceiveFrame(payload_frame, payload_has_more, 0)) {
last_error_text_ = zmq_strerror(zmq_errno());
return -1;
}
if (payload_has_more) {
last_error_text_ = "unexpected multipart payload";
return -1;
}
} else {
payload_frame = std::move(first_frame);
}
payload = std::move(payload_frame);
last_error_text_.clear();
return static_cast<int>(payload.size());
}
const std::string& last_error() const { return last_error_text_; }
@@ -329,14 +428,38 @@ public:
zmq_ctx_term(context_);
context_ = nullptr;
}
last_send_time_.reset();
}
private:
bool ReceiveFrame(std::vector<std::uint8_t>& frame, bool& has_more, int flags)
{
zmq_msg_t message;
zmq_msg_init(&message);
const int received = zmq_msg_recv(&message, socket_, flags);
if (received < 0) {
zmq_msg_close(&message);
return false;
}
const auto* data = static_cast<const std::uint8_t*>(zmq_msg_data(&message));
const std::size_t size = zmq_msg_size(&message);
frame.assign(data, data + size);
int more = 0;
size_t more_size = sizeof(more);
zmq_getsockopt(socket_, ZMQ_RCVMORE, &more, &more_size);
has_more = (more != 0);
zmq_msg_close(&message);
return true;
}
void* context_ = nullptr;
void* socket_ = nullptr;
std::string last_error_text_;
std::string configured_remote_endpoint_;
std::string identity_;
std::optional<Clock::time_point> last_send_time_;
};
class SerialPort {
@@ -781,56 +904,45 @@ bool RunAlgorithmZmqRoundTrip(XYParserHandle parser,
++stats.sent_packets;
stats.sent_bytes += payload_size;
std::array<std::uint8_t, XYPARSER_FRAME_ALGORITHM_VALUE_COUNT * sizeof(double)> response_buffer{};
bool timed_out = false;
const int received = zmq_client.Receive(response_buffer.data(),
static_cast<int>(response_buffer.size()),
timed_out);
if (timed_out) {
++stats.timeout_count;
std::cerr << "Receive algorithm ZMQ payload timed out"
<< " remote=" << zmq_client.configured_remote_endpoint()
<< " zmqError=" << zmq_client.last_error()
<< std::endl;
PrintZmqStats(stats);
return false;
}
if (received <= 0) {
std::cerr << "Receive algorithm ZMQ payload failed"
<< " remote=" << zmq_client.configured_remote_endpoint()
<< " zmqError=" << zmq_client.last_error()
<< std::endl;
PrintZmqStats(stats);
return false;
}
if (received % static_cast<int>(sizeof(double)) != 0) {
++stats.invalid_payload_count;
std::cerr << "Algorithm ZMQ payload size is not aligned to double" << std::endl;
PrintZmqStats(stats);
return false;
}
if (static_cast<std::size_t>(received) != payload_size) {
++stats.invalid_payload_count;
std::cerr << "Algorithm ZMQ payload size mismatch: expectedBytes=" << payload_size
<< " expectedDoubles=" << XYPARSER_FRAME_ALGORITHM_VALUE_COUNT
<< " actualBytes=" << received
<< " actualDoubles=" << (received / static_cast<int>(sizeof(double)))
<< std::endl;
PrintZmqStats(stats);
return false;
}
++stats.received_packets;
stats.received_bytes += static_cast<std::size_t>(received);
bool received_any_response = false;
while (true) {
std::vector<std::uint8_t> response_payload;
bool no_message = false;
const int received = zmq_client.ReceiveNonBlocking(response_payload, no_message);
if (no_message) {
break;
}
if (received <= 0) {
std::cerr << "Receive algorithm ZMQ payload failed"
<< " remote=" << zmq_client.configured_remote_endpoint()
<< " zmqError=" << zmq_client.last_error()
<< std::endl;
PrintZmqStats(stats);
return false;
}
if (received % static_cast<int>(sizeof(double)) != 0) {
++stats.invalid_payload_count;
std::cerr << "Algorithm ZMQ payload size is not aligned to double" << std::endl;
PrintZmqStats(stats);
return false;
}
if (stats.sent_packets <= 3 || (stats.sent_packets % 100) == 0) {
PrintZmqStats(stats);
received_any_response = true;
++stats.received_packets;
stats.received_bytes += static_cast<std::size_t>(received);
std::cout << "[AlgorithmZmqRx] payloadBytes=" << received
<< " doubles=" << (received / static_cast<int>(sizeof(double)))
<< std::endl;
XYParser_FeedAlgorithmData(parser,
response_payload.data(),
response_payload.size(),
nullptr,
0);
}
XYParser_FeedAlgorithmData(parser,
response_buffer.data(),
static_cast<std::size_t>(received),
nullptr,
0);
if (received_any_response || stats.sent_packets <= 3 || (stats.sent_packets % 100) == 0) {
PrintZmqStats(stats);
}
return true;
}
@@ -1046,7 +1158,7 @@ void PrintTriggerEvents(const XYParserFrameSummary& summary)
int Run64Workflow(const DemoOptions& options)
{
constexpr auto kImpedanceDuration = std::chrono::seconds(10);
constexpr auto kImpedanceDuration = std::chrono::seconds(0);
constexpr int kImpedanceSampleRate = 250;
constexpr int kImpedanceGain = 24;
constexpr int kNormalSampleRate = 250;
@@ -1062,17 +1174,38 @@ int Run64Workflow(const DemoOptions& options)
std::cerr << "Connect 64ch TCP failed: " << options.tcp_host << ':' << options.tcp_port << std::endl;
return 1;
}
bool tcp_connected = true;
const auto close_data_tcp = [&]() {
if (!tcp_connected) {
return;
}
std::cout << "Close 64ch TCP connection" << std::endl;
data_client.Close();
tcp_connected = false;
};
SerialPort trigger_port;
const bool has_trigger_port = !options.trigger_com_port.empty();
bool trigger_serial_open = false;
const auto close_trigger_serial = [&]() {
if (!trigger_serial_open) {
return;
}
std::cout << "Close trigger serial" << std::endl;
trigger_port.Close();
trigger_serial_open = false;
};
if (has_trigger_port && !trigger_port.Open(options.trigger_com_port, options.serial_baud_rate)) {
std::cerr << "Open trigger serial failed: " << Narrow(options.trigger_com_port) << std::endl;
close_data_tcp();
return 1;
}
trigger_serial_open = has_trigger_port;
ParserHandleGuard parser(XYParser_CreateParser(64));
if (parser.get() == nullptr) {
std::cerr << "Create 64ch parser failed" << std::endl;
close_data_tcp();
return 1;
}
@@ -1084,16 +1217,32 @@ int Run64Workflow(const DemoOptions& options)
if (!Send64ImpedanceSwitch(data_client, true)) {
std::cerr << "Send 64ch impedance open command failed" << std::endl;
close_data_tcp();
return 1;
}
if (!Send64GainAndSampleRate(data_client, kImpedanceGain, kImpedanceSampleRate)) {
std::cerr << "Send 64ch impedance gain/sample-rate command failed" << std::endl;
close_data_tcp();
return 1;
}
XYParser_SetSampleRate(parser.get(), kImpedanceSampleRate);
XYParser_SetImpedanceDetection(parser.get(), 1);
ZmqDuplexClient algorithm_zmq;
bool algorithm_zmq_open = false;
const auto close_algorithm_zmq = [&]() {
if (!algorithm_zmq_open) {
return;
}
std::cout << "Close algorithm ZMQ" << std::endl;
algorithm_zmq.Close();
algorithm_zmq_open = false;
};
const auto close_all = [&]() {
close_algorithm_zmq();
close_trigger_serial();
close_data_tcp();
};
if (!algorithm_zmq.Open(options.algorithm_host,
options.algorithm_port,
options.algorithm_timeout_ms)) {
@@ -1101,8 +1250,10 @@ int Run64Workflow(const DemoOptions& options)
<< " remote=" << BuildZmqTcpEndpoint(options.algorithm_host, options.algorithm_port)
<< " zmqError=" << algorithm_zmq.last_error()
<< std::endl;
close_all();
return 1;
}
algorithm_zmq_open = true;
std::cout << "Algorithm ZMQ enabled: identity=" << algorithm_zmq.identity()
<< " algorithmHost=" << options.algorithm_host
<< " algorithmPort=" << options.algorithm_port
@@ -1130,6 +1281,7 @@ int Run64Workflow(const DemoOptions& options)
std::cerr << "Send "
<< (next_trigger_type == XYPARSER_TRIGGER_TRAIN_0 ? "TRAIN_0" : "TRAIN_1")
<< " failed" << std::endl;
close_all();
return 1;
}
next_trigger_type = (next_trigger_type == XYPARSER_TRIGGER_TRAIN_0)
@@ -1161,10 +1313,12 @@ int Run64Workflow(const DemoOptions& options)
if (Clock::now() >= impedance_end_time) {
if (!Send64GainAndSampleRate(data_client, kNormalGain, kNormalSampleRate)) {
std::cerr << "Send 64ch normal gain/sample-rate command failed" << std::endl;
close_all();
return 1;
}
if (!Send64ImpedanceSwitch(data_client, false)) {
std::cerr << "Send 64ch impedance close command failed" << std::endl;
close_all();
return 1;
}
XYParser_SetSampleRate(parser.get(), kNormalSampleRate);
@@ -1189,6 +1343,7 @@ int Run64Workflow(const DemoOptions& options)
DrainWelch(parser.get());
}
close_all();
return 0;
}
@@ -1206,10 +1361,20 @@ int Run8Workflow(const DemoOptions& options)
std::cerr << "Open 8ch serial failed: " << Narrow(options.data_com_port) << std::endl;
return 1;
}
bool data_serial_open = true;
const auto close_data_serial = [&]() {
if (!data_serial_open) {
return;
}
std::cout << "Close 8ch data serial" << std::endl;
data_port.Close();
data_serial_open = false;
};
ParserHandleGuard parser(XYParser_CreateParser(8));
if (parser.get() == nullptr) {
std::cerr << "Create 8ch parser failed" << std::endl;
close_data_serial();
return 1;
}
@@ -1221,10 +1386,24 @@ int Run8Workflow(const DemoOptions& options)
if (!Send8ImpedanceSwitch(data_port, true)) {
std::cerr << "Send 8ch impedance open command failed" << std::endl;
close_data_serial();
return 1;
}
ZmqDuplexClient algorithm_zmq;
bool algorithm_zmq_open = false;
const auto close_algorithm_zmq = [&]() {
if (!algorithm_zmq_open) {
return;
}
std::cout << "Close algorithm ZMQ" << std::endl;
algorithm_zmq.Close();
algorithm_zmq_open = false;
};
const auto close_all = [&]() {
close_algorithm_zmq();
close_data_serial();
};
if (!algorithm_zmq.Open(options.algorithm_host,
options.algorithm_port,
options.algorithm_timeout_ms)) {
@@ -1232,8 +1411,10 @@ int Run8Workflow(const DemoOptions& options)
<< " remote=" << BuildZmqTcpEndpoint(options.algorithm_host, options.algorithm_port)
<< " zmqError=" << algorithm_zmq.last_error()
<< std::endl;
close_all();
return 1;
}
algorithm_zmq_open = true;
std::cout << "Algorithm ZMQ enabled: identity=" << algorithm_zmq.identity()
<< " algorithmHost=" << options.algorithm_host
<< " algorithmPort=" << options.algorithm_port
@@ -1270,6 +1451,7 @@ int Run8Workflow(const DemoOptions& options)
if (Clock::now() >= impedance_end_time) {
if (!Send8ImpedanceSwitch(data_port, false)) {
std::cerr << "Send 8ch impedance close command failed" << std::endl;
close_all();
return 1;
}
XYParser_SetImpedanceDetection(parser.get(), 0);
@@ -1292,6 +1474,7 @@ int Run8Workflow(const DemoOptions& options)
DrainWelch(parser.get());
}
close_all();
return 0;
}