feature/streaming #10

Merged
master merged 6 commits from feature/streaming into main 2025-09-29 06:29:12 +02:00
3 changed files with 55 additions and 14 deletions
Showing only changes of commit 7a37901fb2 - Show all commits

View File

@@ -35,6 +35,15 @@ Notes:
- The device accepts `payload` as a string or a JSON object/array. Objects are serialized into a string before dispatching to local subscribers to keep a consistent downstream contract. - The device accepts `payload` as a string or a JSON object/array. Objects are serialized into a string before dispatching to local subscribers to keep a consistent downstream contract.
- A minimal ack `{ "ok": true }` is sent after a valid inbound message. - A minimal ack `{ "ok": true }` is sent after a valid inbound message.
#### Echo suppression (origin tagging)
- To prevent the sender from receiving an immediate echo of its own message, the server injects a private field into JSON payloads:
- `_origin: "ws:<clientId>"`
- When re-broadcasting local events to WebSocket clients, the server:
- Strips the `_origin` field from the outgoing payload
- Skips the originating `clientId` so only other clients receive the message
- If a payload is not valid JSON (plain string), no origin tag is injected and the message may be echoed
### Event Bus Integration ### Event Bus Integration
- The WebSocket registers an `onAny` subscriber to `NodeContext` so that all local events are mirrored to clients. - The WebSocket registers an `onAny` subscriber to `NodeContext` so that all local events are mirrored to clients.

View File

@@ -45,6 +45,7 @@ private:
TaskManager& taskManager; TaskManager& taskManager;
std::vector<std::reference_wrapper<Service>> services; std::vector<std::reference_wrapper<Service>> services;
std::vector<EndpointInfo> endpoints; // Single source of truth for endpoints std::vector<EndpointInfo> endpoints; // Single source of truth for endpoints
std::vector<AsyncWebSocketClient*> wsClients;
// Internal helpers // Internal helpers
void registerEndpoint(const String& uri, int method, void registerEndpoint(const String& uri, int method,

View File

@@ -110,8 +110,6 @@ void ApiServer::setupWebSocket() {
DeserializationError err = deserializeJson(doc, (const char*)data, len); DeserializationError err = deserializeJson(doc, (const char*)data, len);
if (!err) { if (!err) {
LOG_INFO("API", "Received event: " + String(doc["event"].as<String>())); LOG_INFO("API", "Received event: " + String(doc["event"].as<String>()));
String raw; raw.reserve(len + 1); for (size_t i = 0; i < len; ++i) raw += (char)data[i];
LOG_INFO("API", "Received raw WS data: " + raw);
String eventName = doc["event"].as<String>(); String eventName = doc["event"].as<String>();
String payloadStr; String payloadStr;
if (doc["payload"].is<const char*>()) { if (doc["payload"].is<const char*>()) {
@@ -122,8 +120,19 @@ void ApiServer::setupWebSocket() {
} }
// Allow empty payload; services may treat it as defaults // Allow empty payload; services may treat it as defaults
if (eventName.length() > 0) { if (eventName.length() > 0) {
// Inject origin tag into payload JSON if possible
String enriched = payloadStr;
if (payloadStr.length() > 0) {
JsonDocument pd;
if (!deserializeJson(pd, payloadStr)) {
pd["_origin"] = String("ws:") + String(client->id());
String tmp; serializeJson(pd, tmp); enriched = tmp;
} else {
// If payload is plain string, leave as-is (no origin)
}
}
std::string ev = eventName.c_str(); std::string ev = eventName.c_str();
ctx.fire(ev, &payloadStr); ctx.fire(ev, &enriched);
// Acknowledge // Acknowledge
client->text("{\"ok\":true}"); client->text("{\"ok\":true}");
} else { } else {
@@ -135,23 +144,45 @@ void ApiServer::setupWebSocket() {
} }
} else if (type == WS_EVT_CONNECT) { } else if (type == WS_EVT_CONNECT) {
client->text("{\"hello\":\"ws connected\"}"); client->text("{\"hello\":\"ws connected\"}");
wsClients.push_back(client);
} else if (type == WS_EVT_DISCONNECT) { } else if (type == WS_EVT_DISCONNECT) {
// No-op wsClients.erase(std::remove(wsClients.begin(), wsClients.end(), client), wsClients.end());
} }
}); });
// Subscribe to all local events and forward to websocket clients // Subscribe to all local events and forward to websocket clients
ctx.onAny([this](const std::string& event, void* dataPtr) { ctx.onAny([this](const std::string& event, void* dataPtr) {
String* payloadStr = static_cast<String*>(dataPtr); String* payloadStrPtr = static_cast<String*>(dataPtr);
JsonDocument doc; String payloadStr = payloadStrPtr ? *payloadStrPtr : String("");
doc["event"] = event.c_str();
if (payloadStr) { // Extract and strip origin if present
doc["payload"] = *payloadStr; String origin;
} else { String cleanedPayload = payloadStr;
doc["payload"] = ""; if (payloadStr.length() > 0) {
JsonDocument pd;
if (!deserializeJson(pd, payloadStr)) {
if (pd["_origin"].is<const char*>()) {
origin = pd["_origin"].as<const char*>();
pd.remove("_origin");
String tmp; serializeJson(pd, tmp); cleanedPayload = tmp;
} }
String out; }
serializeJson(doc, out); }
JsonDocument outDoc;
outDoc["event"] = event.c_str();
outDoc["payload"] = cleanedPayload;
String out; serializeJson(outDoc, out);
if (origin.startsWith("ws:")) {
uint32_t originId = (uint32_t)origin.substring(3).toInt();
for (auto* c : wsClients) {
if (c && c->id() != originId) {
c->text(out);
}
}
} else {
ws.textAll(out); ws.textAll(out);
}
}); });
} }