#include "sdrpp_server_client.h" #include #include #include #include using namespace std::chrono_literals; namespace server { Client::Client(std::shared_ptr sock, dsp::stream* out) { this->sock = sock; output = out; // Allocate buffers rbuffer = new uint8_t[SERVER_MAX_PACKET_SIZE]; sbuffer = new uint8_t[SERVER_MAX_PACKET_SIZE]; // Initialize headers r_pkt_hdr = (PacketHeader*)rbuffer; r_pkt_data = &rbuffer[sizeof(PacketHeader)]; r_cmd_hdr = (CommandHeader*)r_pkt_data; r_cmd_data = &rbuffer[sizeof(PacketHeader) + sizeof(CommandHeader)]; s_pkt_hdr = (PacketHeader*)sbuffer; s_pkt_data = &sbuffer[sizeof(PacketHeader)]; s_cmd_hdr = (CommandHeader*)s_pkt_data; s_cmd_data = &sbuffer[sizeof(PacketHeader) + sizeof(CommandHeader)]; // Initialize decompressor dctx = ZSTD_createDCtx(); // Initialize DSP decompIn.setBufferSize(STREAM_BUFFER_SIZE*sizeof(dsp::complex_t) + 8); decompIn.clearWriteStop(); decomp.init(&decompIn); link.init(&decomp.out, output); decomp.start(); link.start(); // Start worker thread workerThread = std::thread(&Client::worker, this); // Ask for a UI int res = getUI(); if (res < 0) { // Close client close(); // Throw error switch (res) { case CONN_ERR_TIMEOUT: throw std::runtime_error("Timed out"); case CONN_ERR_BUSY: throw std::runtime_error("Server busy"); default: throw std::runtime_error("Unknown error"); } } } Client::~Client() { close(); ZSTD_freeDCtx(dctx); delete[] rbuffer; delete[] sbuffer; } void Client::showMenu() { std::string diffId = ""; SmGui::DrawListElem diffValue; bool syncRequired = false; { std::lock_guard lck(dlMtx); dl.draw(diffId, diffValue, syncRequired); } if (!diffId.empty()) { // Save ID SmGui::DrawListElem elemId; elemId.type = SmGui::DRAW_LIST_ELEM_TYPE_STRING; elemId.str = diffId; // Encore packet int size = 0; s_cmd_data[size++] = syncRequired; size += SmGui::DrawList::storeItem(elemId, &s_cmd_data[size], SERVER_MAX_PACKET_SIZE - size); size += SmGui::DrawList::storeItem(diffValue, &s_cmd_data[size], SERVER_MAX_PACKET_SIZE - size); // Send if (syncRequired) { flog::warn("Action requires resync"); auto waiter = awaitCommandAck(COMMAND_UI_ACTION); sendCommand(COMMAND_UI_ACTION, size); if (waiter->await(PROTOCOL_TIMEOUT_MS)) { std::lock_guard lck(dlMtx); dl.load(r_cmd_data, r_pkt_hdr->size - sizeof(PacketHeader) - sizeof(CommandHeader)); } else { flog::error("Timeout out after asking for UI"); } waiter->handled(); flog::warn("Resync done"); } else { flog::warn("Action does not require resync"); sendCommand(COMMAND_UI_ACTION, size); } } } void Client::setFrequency(double freq) { if (!isOpen()) { return; } *(double*)s_cmd_data = freq; sendCommand(COMMAND_SET_FREQUENCY, sizeof(double)); auto waiter = awaitCommandAck(COMMAND_SET_FREQUENCY); waiter->await(PROTOCOL_TIMEOUT_MS); waiter->handled(); } double Client::getSampleRate() { return currentSampleRate; } void Client::setSampleType(dsp::compression::PCMType type) { if (!isOpen()) { return; } s_cmd_data[0] = type; sendCommand(COMMAND_SET_SAMPLE_TYPE, 1); } void Client::setCompression(bool enabled) { if (!isOpen()) { return; } s_cmd_data[0] = enabled; sendCommand(COMMAND_SET_COMPRESSION, 1); } void Client::start() { if (!isOpen()) { return; } sendCommand(COMMAND_START, 0); getUI(); } void Client::stop() { if (!isOpen()) { return; } sendCommand(COMMAND_STOP, 0); getUI(); } void Client::close() { // Stop worker decompIn.stopWriter(); if (sock) { sock->close(); } if (workerThread.joinable()) { workerThread.join(); } decompIn.clearWriteStop(); // Stop DSP decomp.stop(); link.stop(); } bool Client::isOpen() { return sock && sock->isOpen(); } void Client::worker() { while (true) { // Receive header if (sock->recv(rbuffer, sizeof(PacketHeader), true) <= 0) { break; } // Receive remaining data if (sock->recv(&rbuffer[sizeof(PacketHeader)], r_pkt_hdr->size - sizeof(PacketHeader), true, PROTOCOL_TIMEOUT_MS) <= 0) { break; } // Increment data counter bytes += r_pkt_hdr->size; // Decode packet if (r_pkt_hdr->type == PACKET_TYPE_COMMAND) { // TODO: Move to command handler if (r_cmd_hdr->cmd == COMMAND_SET_SAMPLERATE && r_pkt_hdr->size == sizeof(PacketHeader) + sizeof(CommandHeader) + sizeof(double)) { currentSampleRate = *(double*)r_cmd_data; core::setInputSampleRate(currentSampleRate); } else if (r_cmd_hdr->cmd == COMMAND_DISCONNECT) { flog::error("Asked to disconnect by the server"); serverBusy = true; // Cancel waiters std::vector toBeRemoved; for (auto& [waiter, cmd] : commandAckWaiters) { waiter->cancel(); toBeRemoved.push_back(waiter); } // Remove handled waiters for (auto& waiter : toBeRemoved) { commandAckWaiters.erase(waiter); delete waiter; } } } else if (r_pkt_hdr->type == PACKET_TYPE_COMMAND_ACK) { // Notify waiters std::vector toBeRemoved; for (auto& [waiter, cmd] : commandAckWaiters) { if (cmd != r_cmd_hdr->cmd) { continue; } waiter->notify(); toBeRemoved.push_back(waiter); } // Remove handled waiters for (auto& waiter : toBeRemoved) { commandAckWaiters.erase(waiter); delete waiter; } } else if (r_pkt_hdr->type == PACKET_TYPE_BASEBAND) { memcpy(decompIn.writeBuf, &rbuffer[sizeof(PacketHeader)], r_pkt_hdr->size - sizeof(PacketHeader)); if (!decompIn.swap(r_pkt_hdr->size - sizeof(PacketHeader))) { break; } } else if (r_pkt_hdr->type == PACKET_TYPE_BASEBAND_COMPRESSED) { size_t outCount = ZSTD_decompressDCtx(dctx, decompIn.writeBuf, STREAM_BUFFER_SIZE*sizeof(dsp::complex_t)+8, r_pkt_data, r_pkt_hdr->size - sizeof(PacketHeader)); if (outCount) { if (!decompIn.swap(outCount)) { break; } }; } else if (r_pkt_hdr->type == PACKET_TYPE_ERROR) { flog::error("SDR++ Server Error: {0}", rbuffer[sizeof(PacketHeader)]); } else { flog::error("Invalid packet type: {0}", r_pkt_hdr->type); } } } int Client::getUI() { if (!isOpen()) { return -1; } auto waiter = awaitCommandAck(COMMAND_GET_UI); sendCommand(COMMAND_GET_UI, 0); if (waiter->await(PROTOCOL_TIMEOUT_MS)) { std::lock_guard lck(dlMtx); dl.load(r_cmd_data, r_pkt_hdr->size - sizeof(PacketHeader) - sizeof(CommandHeader)); } else { if (!serverBusy) { flog::error("Timeout out after asking for UI"); }; waiter->handled(); return serverBusy ? CONN_ERR_BUSY : CONN_ERR_TIMEOUT; } waiter->handled(); return 0; } void Client::sendPacket(PacketType type, int len) { s_pkt_hdr->type = type; s_pkt_hdr->size = sizeof(PacketHeader) + len; sock->send(sbuffer, s_pkt_hdr->size); } void Client::sendCommand(Command cmd, int len) { s_cmd_hdr->cmd = cmd; sendPacket(PACKET_TYPE_COMMAND, sizeof(CommandHeader) + len); } void Client::sendCommandAck(Command cmd, int len) { s_cmd_hdr->cmd = cmd; sendPacket(PACKET_TYPE_COMMAND_ACK, sizeof(CommandHeader) + len); } PacketWaiter* Client::awaitCommandAck(Command cmd) { PacketWaiter* waiter = new PacketWaiter; commandAckWaiters[waiter] = cmd; return waiter; } void Client::dHandler(dsp::complex_t *data, int count, void *ctx) { Client* _this = (Client*)ctx; memcpy(_this->output->writeBuf, data, count * sizeof(dsp::complex_t)); _this->output->swap(count); } std::shared_ptr connect(std::string host, uint16_t port, dsp::stream* out) { return std::make_shared(net::connect(host, port), out); } }