feat(streaming): introduce WebSocket Streaming API bridging event bus
ApiServer: add AsyncWebSocket at /ws; accept JSON {event, payload} (string or object) and dispatch via ctx.fire; mirror all local events to clients using NodeContext::onAny.\nNodeContext: add onAny subscriber API.\nNeoPatternService: add api/neopattern/color event to set solid color.\nCluster: centralize cluster/broadcast sending in core; services delegate.\nAPI: add generic /api/node/event and /api/cluster/event endpoints in respective services.\nTests: add ws-color-client, ws-cluster-broadcast-color, http-cluster-broadcast-color.\nDocs: add StreamingAPI.md; update README and test/README.\nFixes: robust WS JSON parsing on ESP8266 and payload handling.
This commit is contained in:
@@ -87,6 +87,10 @@ void ApiServer::serveStatic(const String& uri, fs::FS& fs, const String& path, c
|
||||
}
|
||||
|
||||
void ApiServer::begin() {
|
||||
// Setup streaming API (WebSocket)
|
||||
setupWebSocket();
|
||||
server.addHandler(&ws);
|
||||
|
||||
// Register all service endpoints
|
||||
for (auto& service : services) {
|
||||
service.get().registerEndpoints(*this);
|
||||
@@ -95,3 +99,59 @@ void ApiServer::begin() {
|
||||
|
||||
server.begin();
|
||||
}
|
||||
|
||||
void ApiServer::setupWebSocket() {
|
||||
ws.onEvent([this](AsyncWebSocket* server, AsyncWebSocketClient* client, AwsEventType type, void* arg, uint8_t* data, size_t len) {
|
||||
if (type == WS_EVT_DATA) {
|
||||
AwsFrameInfo* info = (AwsFrameInfo*)arg;
|
||||
if (info->final && info->index == 0 && info->len == len && info->opcode == WS_TEXT) {
|
||||
// Parse directly from the raw buffer with explicit length
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, (const char*)data, len);
|
||||
if (!err) {
|
||||
LOG_INFO("API", "Received event: " + String(doc["event"].as<String>()));
|
||||
String raw; raw.reserve(len + 1); for (size_t i = 0; i < len; ++i) raw += (char)data[i];
|
||||
LOG_INFO("API", "Received raw WS data: " + raw);
|
||||
String eventName = doc["event"].as<String>();
|
||||
String payloadStr;
|
||||
if (doc["payload"].is<const char*>()) {
|
||||
payloadStr = doc["payload"].as<const char*>();
|
||||
} else if (!doc["payload"].isNull()) {
|
||||
// If payload is an object/array, serialize it
|
||||
String tmp; serializeJson(doc["payload"], tmp); payloadStr = tmp;
|
||||
}
|
||||
// Allow empty payload; services may treat it as defaults
|
||||
if (eventName.length() > 0) {
|
||||
std::string ev = eventName.c_str();
|
||||
ctx.fire(ev, &payloadStr);
|
||||
// Acknowledge
|
||||
client->text("{\"ok\":true}");
|
||||
} else {
|
||||
client->text("{\"error\":\"Missing 'event'\"}");
|
||||
}
|
||||
} else {
|
||||
client->text("{\"error\":\"Invalid JSON\"}");
|
||||
}
|
||||
}
|
||||
} else if (type == WS_EVT_CONNECT) {
|
||||
client->text("{\"hello\":\"ws connected\"}");
|
||||
} else if (type == WS_EVT_DISCONNECT) {
|
||||
// No-op
|
||||
}
|
||||
});
|
||||
|
||||
// Subscribe to all local events and forward to websocket clients
|
||||
ctx.onAny([this](const std::string& event, void* dataPtr) {
|
||||
String* payloadStr = static_cast<String*>(dataPtr);
|
||||
JsonDocument doc;
|
||||
doc["event"] = event.c_str();
|
||||
if (payloadStr) {
|
||||
doc["payload"] = *payloadStr;
|
||||
} else {
|
||||
doc["payload"] = "";
|
||||
}
|
||||
String out;
|
||||
serializeJson(doc, out);
|
||||
ws.textAll(out);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -29,4 +29,11 @@ void NodeContext::fire(const std::string& event, void* data) {
|
||||
for (auto& cb : eventRegistry[event]) {
|
||||
cb(data);
|
||||
}
|
||||
for (auto& acb : anyEventSubscribers) {
|
||||
acb(event, data);
|
||||
}
|
||||
}
|
||||
|
||||
void NodeContext::onAny(AnyEventCallback cb) {
|
||||
anyEventSubscribers.push_back(cb);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user