diff --git a/examples/neopattern/NeoPatternService.cpp b/examples/neopattern/NeoPatternService.cpp index 99e52ff..c5db8df 100644 --- a/examples/neopattern/NeoPatternService.cpp +++ b/examples/neopattern/NeoPatternService.cpp @@ -1,10 +1,13 @@ #include "NeoPatternService.h" #include "spore/core/ApiServer.h" #include "spore/util/Logging.h" +#include "spore/internal/Globals.h" #include +#include -NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config) +NeoPatternService::NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config) : taskManager(taskMgr), + ctx(ctx), config(config), activePattern(NeoPatternType::RAINBOW_CYCLE), direction(NeoDirection::FORWARD), @@ -32,6 +35,7 @@ NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& registerPatterns(); registerTasks(); + registerEventHandlers(); initialized = true; LOG_INFO("NeoPattern", "Service initialized"); @@ -64,7 +68,8 @@ void NeoPatternService::registerEndpoints(ApiServer& api) { ParamSpec{String("brightness"), false, String("body"), String("numberRange"), {}, String("80")}, ParamSpec{String("total_steps"), false, String("body"), String("numberRange"), {}, String("16")}, ParamSpec{String("direction"), false, String("body"), String("string"), {String("forward"), String("reverse")}}, - ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")} + ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")}, + ParamSpec{String("broadcast"), false, String("body"), String("boolean"), {}} }); // State endpoint for complex state updates @@ -119,61 +124,49 @@ void NeoPatternService::handlePatternsRequest(AsyncWebServerRequest* request) { void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) { bool updated = false; + bool broadcast = false; - if (request->hasParam("pattern", true)) { - String name = request->getParam("pattern", true)->value(); - if (isValidPattern(name)) { - setPatternByName(name); - updated = true; - } else { - // Invalid pattern name - could add error handling here - LOG_WARN("NeoPattern", "Invalid pattern name: " + name); - } + if (request->hasParam("broadcast", true)) { + String b = request->getParam("broadcast", true)->value(); + broadcast = b.equalsIgnoreCase("true") || b == "1"; } - if (request->hasParam("color", true)) { - String colorStr = request->getParam("color", true)->value(); - uint32_t color = parseColor(colorStr); - setColor(color); + // Build JSON payload from provided params (single source of truth) + JsonDocument payload; + bool any = false; + if (request->hasParam("pattern", true)) { payload["pattern"] = request->getParam("pattern", true)->value(); any = true; } + if (request->hasParam("color", true)) { payload["color"] = request->getParam("color", true)->value(); any = true; } + if (request->hasParam("color2", true)) { payload["color2"] = request->getParam("color2", true)->value(); any = true; } + if (request->hasParam("brightness", true)) { payload["brightness"] = request->getParam("brightness", true)->value(); any = true; } + if (request->hasParam("total_steps", true)) { payload["total_steps"] = request->getParam("total_steps", true)->value(); any = true; } + if (request->hasParam("direction", true)) { payload["direction"] = request->getParam("direction", true)->value(); any = true; } + if (request->hasParam("interval", true)) { payload["interval"] = request->getParam("interval", true)->value(); any = true; } + + String payloadStr; + serializeJson(payload, payloadStr); + + // Always apply locally via event so we have a single codepath for updates + if (any) { + std::string ev = "api/neopattern"; + String localData = payloadStr; + LOG_INFO("NeoPattern", String("Applying local api/neopattern via event payloadLen=") + String(payloadStr.length())); + ctx.fire(ev, &localData); updated = true; } - if (request->hasParam("color2", true)) { - String colorStr = request->getParam("color2", true)->value(); - uint32_t color = parseColor(colorStr); - setColor2(color); - updated = true; - } + // Broadcast to peers if requested (delegate to core broadcast handler) + if (broadcast && any) { + JsonDocument eventDoc; + eventDoc["event"] = "api/neopattern"; + eventDoc["data"] = payloadStr; // data is JSON string - if (request->hasParam("brightness", true)) { - int b = request->getParam("brightness", true)->value().toInt(); - if (b < 0) b = 0; - if (b > 255) b = 255; - setBrightness(static_cast(b)); - updated = true; - } + String eventJson; + serializeJson(eventDoc, eventJson); - if (request->hasParam("total_steps", true)) { - int steps = request->getParam("total_steps", true)->value().toInt(); - if (steps > 0) { - setTotalSteps(static_cast(steps)); - updated = true; - } - } - - if (request->hasParam("direction", true)) { - String dirStr = request->getParam("direction", true)->value(); - NeoDirection dir = (dirStr.equalsIgnoreCase("reverse")) ? NeoDirection::REVERSE : NeoDirection::FORWARD; - setDirection(dir); - updated = true; - } - - if (request->hasParam("interval", true)) { - unsigned long interval = request->getParam("interval", true)->value().toInt(); - if (interval > 0) { - setUpdateInterval(interval); - updated = true; - } + LOG_INFO("NeoPattern", String("Submitting cluster/broadcast for api/neopattern payloadLen=") + String(payloadStr.length())); + std::string ev = "cluster/broadcast"; + String eventStr = eventJson; + ctx.fire(ev, &eventStr); } // Return current state @@ -192,6 +185,101 @@ void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) { serializeJson(resp, json); request->send(200, "application/json", json); } +void NeoPatternService::registerEventHandlers() { + ctx.on("api/neopattern", [this](void* dataPtr) { + String* jsonStr = static_cast(dataPtr); + if (!jsonStr) { + LOG_WARN("NeoPattern", "Received api/neopattern with null dataPtr"); + return; + } + LOG_INFO("NeoPattern", String("Received api/neopattern event dataLen=") + String(jsonStr->length())); + JsonDocument doc; + DeserializationError err = deserializeJson(doc, *jsonStr); + if (err) { + LOG_WARN("NeoPattern", String("Failed to parse CLUSTER_EVENT data: ") + err.c_str()); + return; + } + JsonObject obj = doc.as(); + bool applied = applyControlParams(obj); + if (applied) { + LOG_INFO("NeoPattern", "Applied control from CLUSTER_EVENT"); + } + }); +} + +bool NeoPatternService::applyControlParams(const JsonObject& obj) { + bool updated = false; + if (obj["pattern"].is() || obj["pattern"].is()) { + String name = obj["pattern"].as(); + if (isValidPattern(name)) { + setPatternByName(name); + updated = true; + } + } + if (obj["color"].is() || obj["color"].is() || obj["color"].is() || obj["color"].is()) { + String colorStr; + if (obj["color"].is() || obj["color"].is()) { + colorStr = String(obj["color"].as()); + } else { + colorStr = obj["color"].as(); + } + uint32_t color = parseColor(colorStr); + setColor(color); + updated = true; + } + if (obj["color2"].is() || obj["color2"].is() || obj["color2"].is() || obj["color2"].is()) { + String colorStr; + if (obj["color2"].is() || obj["color2"].is()) { + colorStr = String(obj["color2"].as()); + } else { + colorStr = obj["color2"].as(); + } + uint32_t color = parseColor(colorStr); + setColor2(color); + updated = true; + } + if (obj["brightness"].is() || obj["brightness"].is() || obj["brightness"].is() || obj["brightness"].is()) { + int b = 0; + if (obj["brightness"].is() || obj["brightness"].is()) { + b = obj["brightness"].as(); + } else { + b = String(obj["brightness"].as()).toInt(); + } + if (b < 0) { + b = 0; + } + if (b > 255) { + b = 255; + } + setBrightness(static_cast(b)); + updated = true; + } + if (obj["total_steps"].is() || obj["total_steps"].is() || obj["total_steps"].is() || obj["total_steps"].is()) { + int steps = 0; + if (obj["total_steps"].is() || obj["total_steps"].is()) { + steps = obj["total_steps"].as(); + } else { + steps = String(obj["total_steps"].as()).toInt(); + } + if (steps > 0) { setTotalSteps(static_cast(steps)); updated = true; } + } + if (obj["direction"].is() || obj["direction"].is()) { + String dirStr = obj["direction"].as(); + NeoDirection dir = (dirStr.equalsIgnoreCase("reverse")) ? NeoDirection::REVERSE : NeoDirection::FORWARD; + setDirection(dir); + updated = true; + } + if (obj["interval"].is() || obj["interval"].is() || obj["interval"].is() || obj["interval"].is()) { + unsigned long interval = 0; + if (obj["interval"].is() || obj["interval"].is()) { + interval = obj["interval"].as(); + } else { + interval = String(obj["interval"].as()).toInt(); + } + if (interval > 0) { setUpdateInterval(interval); updated = true; } + } + return updated; +} void NeoPatternService::handleStateRequest(AsyncWebServerRequest* request) { if (request->contentType() != "application/json") { diff --git a/examples/neopattern/NeoPatternService.h b/examples/neopattern/NeoPatternService.h index 3103911..b4e1a1b 100644 --- a/examples/neopattern/NeoPatternService.h +++ b/examples/neopattern/NeoPatternService.h @@ -1,6 +1,7 @@ #pragma once #include "spore/Service.h" #include "spore/core/TaskManager.h" +#include "spore/core/NodeContext.h" #include "NeoPattern.h" #include "NeoPatternState.h" #include "NeoPixelConfig.h" @@ -25,7 +26,7 @@ public: REVERSE }; - NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config); + NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config); ~NeoPatternService(); void registerEndpoints(ApiServer& api) override; @@ -49,6 +50,8 @@ private: void registerTasks(); void registerPatterns(); void update(); + void registerEventHandlers(); + bool applyControlParams(const JsonObject& obj); // Pattern updaters void updateRainbowCycle(); @@ -80,6 +83,7 @@ private: String getPatternDescription(const String& name) const; TaskManager& taskManager; + NodeContext& ctx; NeoPattern* neoPattern; NeoPixelConfig config; NeoPatternState currentState; diff --git a/examples/neopattern/main.cpp b/examples/neopattern/main.cpp index 09745eb..2aed7d4 100644 --- a/examples/neopattern/main.cpp +++ b/examples/neopattern/main.cpp @@ -45,7 +45,7 @@ void setup() { ); // Create and add custom service - neoPatternService = new NeoPatternService(spore.getTaskManager(), config); + neoPatternService = new NeoPatternService(spore.getContext(), spore.getTaskManager(), config); spore.addService(neoPatternService); // Start the API server and complete initialization diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index cd994f0..ca4095c 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -39,10 +39,12 @@ private: static bool isHeartbeatMsg(const char* msg); static bool isResponseMsg(const char* msg); static bool isNodeInfoMsg(const char* msg); + static bool isClusterEventMsg(const char* msg); void onDiscovery(const char* msg); void onHeartbeat(const char* msg); void onResponse(const char* msg); void onNodeInfo(const char* msg); + void onClusterEvent(const char* msg); unsigned long lastHeartbeatSentAt = 0; std::vector messageHandlers; }; diff --git a/include/spore/internal/Globals.h b/include/spore/internal/Globals.h index c43d911..690f2e8 100644 --- a/include/spore/internal/Globals.h +++ b/include/spore/internal/Globals.h @@ -9,6 +9,7 @@ namespace ClusterProtocol { constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE"; constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT"; constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO"; + constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT"; constexpr uint16_t UDP_PORT = 4210; // Increased buffer to accommodate node info JSON over UDP constexpr size_t UDP_BUF_SIZE = 512; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 4746488..1d6e2e9 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -8,6 +8,23 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx NodeInfo* node = static_cast(data); this->addOrUpdateNode(node->hostname, node->ip); }); + // Centralized broadcast handler: services fire 'cluster/broadcast' with CLUSTER_EVENT JSON payload + ctx.on("cluster/broadcast", [this](void* data) { + String* jsonStr = static_cast(data); + if (!jsonStr) { + LOG_WARN("Cluster", "cluster/broadcast called with null data"); + return; + } + // Subnet-directed broadcast (more reliable than 255.255.255.255 on some networks) + IPAddress ip = WiFi.localIP(); + IPAddress mask = WiFi.subnetMask(); + IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]); + LOG_INFO("Cluster", String("Broadcasting CLUSTER_EVENT to ") + bcast.toString() + " len=" + String(jsonStr->length())); + this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port); + String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr; + this->ctx.udp->write(msg.c_str()); + this->ctx.udp->endPacket(); + }); // Register tasks registerTasks(); initMessageHandlers(); @@ -41,7 +58,11 @@ void ClusterManager::listen() { if (len <= 0) { return; } - incoming[len] = 0; + if (len >= (int)ClusterProtocol::UDP_BUF_SIZE) { + incoming[ClusterProtocol::UDP_BUF_SIZE - 1] = 0; + } else { + incoming[len] = 0; + } handleIncomingMessage(incoming); } @@ -51,6 +72,7 @@ void ClusterManager::initMessageHandlers() { messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" }); messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" }); messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" }); + messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" }); } void ClusterManager::handleIncomingMessage(const char* incoming) { @@ -60,6 +82,15 @@ void ClusterManager::handleIncomingMessage(const char* incoming) { return; } } + // Unknown message - log first token + const char* colon = strchr(incoming, ':'); + String head; + if (colon) { + head = String(incoming).substring(0, colon - incoming); + } else { + head = String(incoming); + } + LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head); } bool ClusterManager::isDiscoveryMsg(const char* msg) { @@ -78,6 +109,10 @@ bool ClusterManager::isNodeInfoMsg(const char* msg) { return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0; } +bool ClusterManager::isClusterEventMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::CLUSTER_EVENT_MSG, strlen(ClusterProtocol::CLUSTER_EVENT_MSG)) == 0; +} + void ClusterManager::onDiscovery(const char* /*msg*/) { ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; @@ -174,6 +209,50 @@ void ClusterManager::onNodeInfo(const char* msg) { } } +void ClusterManager::onClusterEvent(const char* msg) { + // Message format: CLUSTER_EVENT:{"event":"...","data":""} + const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':' + if (*jsonStart == '\0') { + LOG_DEBUG("Cluster", "CLUSTER_EVENT received with empty payload"); + return; + } + LOG_INFO("Cluster", String("CLUSTER_EVENT raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart))); + JsonDocument doc; + DeserializationError err = deserializeJson(doc, jsonStart); + if (err) { + LOG_DEBUG("Cluster", String("Failed to parse CLUSTER_EVENT JSON from ") + ctx.udp->remoteIP().toString()); + return; + } + // Robust extraction of event and data + String eventStr; + if (doc["event"].is()) { + eventStr = doc["event"].as(); + } else if (doc["event"].is()) { + eventStr = doc["event"].as(); + } + + String data; + if (doc["data"].is()) { + data = doc["data"].as(); + } else if (doc["data"].is()) { + // If data is a nested JSON object/array, serialize it back to string + String tmp; + serializeJson(doc["data"], tmp); + data = tmp; + } + + if (eventStr.length() == 0 || data.length() == 0) { + String dbg; + serializeJson(doc, dbg); + LOG_WARN("Cluster", String("CLUSTER_EVENT missing 'event' or 'data' | payload=") + dbg); + return; + } + + std::string eventKey(eventStr.c_str()); + LOG_INFO("Cluster", String("Firing event '") + eventStr + "' with dataLen=" + String(data.length())); + ctx.fire(eventKey, &data); +} + void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { auto& memberList = *ctx.memberList;