From 7a37901fb29cc0b3b07bf54568a003e20bfbfe72 Mon Sep 17 00:00:00 2001 From: Patrick Balsiger Date: Sun, 28 Sep 2025 21:23:05 +0200 Subject: [PATCH] feat(streaming): suppress WS echo via origin tagging Inject _origin=ws: into JSON payloads on inbound WS messages and strip it on broadcast while skipping the origin client. Documents behavior in StreamingAPI.md. --- docs/StreamingAPI.md | 9 ++++++ include/spore/core/ApiServer.h | 1 + src/spore/core/ApiServer.cpp | 59 ++++++++++++++++++++++++++-------- 3 files changed, 55 insertions(+), 14 deletions(-) diff --git a/docs/StreamingAPI.md b/docs/StreamingAPI.md index aeb3fb2..0ab22ba 100644 --- a/docs/StreamingAPI.md +++ b/docs/StreamingAPI.md @@ -35,6 +35,15 @@ Notes: - The device accepts `payload` as a string or a JSON object/array. Objects are serialized into a string before dispatching to local subscribers to keep a consistent downstream contract. - A minimal ack `{ "ok": true }` is sent after a valid inbound message. +#### Echo suppression (origin tagging) + +- To prevent the sender from receiving an immediate echo of its own message, the server injects a private field into JSON payloads: + - `_origin: "ws:"` +- When re-broadcasting local events to WebSocket clients, the server: + - Strips the `_origin` field from the outgoing payload + - Skips the originating `clientId` so only other clients receive the message + - If a payload is not valid JSON (plain string), no origin tag is injected and the message may be echoed + ### Event Bus Integration - The WebSocket registers an `onAny` subscriber to `NodeContext` so that all local events are mirrored to clients. diff --git a/include/spore/core/ApiServer.h b/include/spore/core/ApiServer.h index 68629d6..dcf2eb0 100644 --- a/include/spore/core/ApiServer.h +++ b/include/spore/core/ApiServer.h @@ -45,6 +45,7 @@ private: TaskManager& taskManager; std::vector> services; std::vector endpoints; // Single source of truth for endpoints + std::vector wsClients; // Internal helpers void registerEndpoint(const String& uri, int method, diff --git a/src/spore/core/ApiServer.cpp b/src/spore/core/ApiServer.cpp index cda6be9..5397ce3 100644 --- a/src/spore/core/ApiServer.cpp +++ b/src/spore/core/ApiServer.cpp @@ -110,8 +110,6 @@ void ApiServer::setupWebSocket() { DeserializationError err = deserializeJson(doc, (const char*)data, len); if (!err) { LOG_INFO("API", "Received event: " + String(doc["event"].as())); - String raw; raw.reserve(len + 1); for (size_t i = 0; i < len; ++i) raw += (char)data[i]; - LOG_INFO("API", "Received raw WS data: " + raw); String eventName = doc["event"].as(); String payloadStr; if (doc["payload"].is()) { @@ -122,8 +120,19 @@ void ApiServer::setupWebSocket() { } // Allow empty payload; services may treat it as defaults if (eventName.length() > 0) { + // Inject origin tag into payload JSON if possible + String enriched = payloadStr; + if (payloadStr.length() > 0) { + JsonDocument pd; + if (!deserializeJson(pd, payloadStr)) { + pd["_origin"] = String("ws:") + String(client->id()); + String tmp; serializeJson(pd, tmp); enriched = tmp; + } else { + // If payload is plain string, leave as-is (no origin) + } + } std::string ev = eventName.c_str(); - ctx.fire(ev, &payloadStr); + ctx.fire(ev, &enriched); // Acknowledge client->text("{\"ok\":true}"); } else { @@ -135,23 +144,45 @@ void ApiServer::setupWebSocket() { } } else if (type == WS_EVT_CONNECT) { client->text("{\"hello\":\"ws connected\"}"); + wsClients.push_back(client); } else if (type == WS_EVT_DISCONNECT) { - // No-op + wsClients.erase(std::remove(wsClients.begin(), wsClients.end(), client), wsClients.end()); } }); // Subscribe to all local events and forward to websocket clients ctx.onAny([this](const std::string& event, void* dataPtr) { - String* payloadStr = static_cast(dataPtr); - JsonDocument doc; - doc["event"] = event.c_str(); - if (payloadStr) { - doc["payload"] = *payloadStr; - } else { - doc["payload"] = ""; + String* payloadStrPtr = static_cast(dataPtr); + String payloadStr = payloadStrPtr ? *payloadStrPtr : String(""); + + // Extract and strip origin if present + String origin; + String cleanedPayload = payloadStr; + if (payloadStr.length() > 0) { + JsonDocument pd; + if (!deserializeJson(pd, payloadStr)) { + if (pd["_origin"].is()) { + origin = pd["_origin"].as(); + pd.remove("_origin"); + String tmp; serializeJson(pd, tmp); cleanedPayload = tmp; + } + } + } + + JsonDocument outDoc; + outDoc["event"] = event.c_str(); + outDoc["payload"] = cleanedPayload; + String out; serializeJson(outDoc, out); + + if (origin.startsWith("ws:")) { + uint32_t originId = (uint32_t)origin.substring(3).toInt(); + for (auto* c : wsClients) { + if (c && c->id() != originId) { + c->text(out); + } + } + } else { + ws.textAll(out); } - String out; - serializeJson(doc, out); - ws.textAll(out); }); }