diff --git a/examples/neopattern/NeoPatternService.cpp b/examples/neopattern/NeoPatternService.cpp index 99e52ff..378177a 100644 --- a/examples/neopattern/NeoPatternService.cpp +++ b/examples/neopattern/NeoPatternService.cpp @@ -1,10 +1,13 @@ #include "NeoPatternService.h" #include "spore/core/ApiServer.h" #include "spore/util/Logging.h" +#include "spore/internal/Globals.h" #include +#include -NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config) +NeoPatternService::NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config) : taskManager(taskMgr), + ctx(ctx), config(config), activePattern(NeoPatternType::RAINBOW_CYCLE), direction(NeoDirection::FORWARD), @@ -32,6 +35,7 @@ NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& registerPatterns(); registerTasks(); + registerEventHandlers(); initialized = true; LOG_INFO("NeoPattern", "Service initialized"); @@ -64,7 +68,8 @@ void NeoPatternService::registerEndpoints(ApiServer& api) { ParamSpec{String("brightness"), false, String("body"), String("numberRange"), {}, String("80")}, ParamSpec{String("total_steps"), false, String("body"), String("numberRange"), {}, String("16")}, ParamSpec{String("direction"), false, String("body"), String("string"), {String("forward"), String("reverse")}}, - ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")} + ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")}, + ParamSpec{String("broadcast"), false, String("body"), String("boolean"), {}} }); // State endpoint for complex state updates @@ -119,6 +124,7 @@ void NeoPatternService::handlePatternsRequest(AsyncWebServerRequest* request) { void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) { bool updated = false; + bool broadcast = false; if (request->hasParam("pattern", true)) { String name = request->getParam("pattern", true)->value(); @@ -176,6 +182,48 @@ void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) { } } + if (request->hasParam("broadcast", true)) { + String b = request->getParam("broadcast", true)->value(); + broadcast = b.equalsIgnoreCase("true") || b == "1"; + } + + if (broadcast) { + // Build JSON payload from provided params + JsonDocument payload; + if (request->hasParam("pattern", true)) payload["pattern"] = request->getParam("pattern", true)->value(); + if (request->hasParam("color", true)) payload["color"] = request->getParam("color", true)->value(); + if (request->hasParam("color2", true)) payload["color2"] = request->getParam("color2", true)->value(); + if (request->hasParam("brightness", true)) payload["brightness"] = request->getParam("brightness", true)->value(); + if (request->hasParam("total_steps", true)) payload["total_steps"] = request->getParam("total_steps", true)->value(); + if (request->hasParam("direction", true)) payload["direction"] = request->getParam("direction", true)->value(); + if (request->hasParam("interval", true)) payload["interval"] = request->getParam("interval", true)->value(); + + String payloadStr; + serializeJson(payload, payloadStr); + + JsonDocument eventDoc; + eventDoc["event"] = "api/neopattern"; + eventDoc["data"] = payloadStr; // data is arbitrary JSON string + + String eventJson; + serializeJson(eventDoc, eventJson); + // Compute subnet-directed broadcast to improve delivery on some networks + IPAddress ip = WiFi.localIP(); + IPAddress mask = WiFi.subnetMask(); + IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]); + LOG_INFO("NeoPattern", String("Broadcasting CLUSTER_EVENT api/neopattern to ") + bcast.toString() + " payloadLen=" + String(payloadStr.length())); + ctx.udp->beginPacket(bcast, ctx.config.udp_port); + String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + eventJson; + ctx.udp->write(msg.c_str()); + ctx.udp->endPacket(); + + // Fire locally as well so the sender applies the change + std::string ev = "api/neopattern"; + String localData = payloadStr; + LOG_INFO("NeoPattern", "Firing local api/neopattern event after broadcast"); + ctx.fire(ev, &localData); + } + // Return current state JsonDocument resp; resp["ok"] = true; @@ -192,6 +240,101 @@ void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) { serializeJson(resp, json); request->send(200, "application/json", json); } +void NeoPatternService::registerEventHandlers() { + ctx.on("api/neopattern", [this](void* dataPtr) { + String* jsonStr = static_cast(dataPtr); + if (!jsonStr) { + LOG_WARN("NeoPattern", "Received api/neopattern with null dataPtr"); + return; + } + LOG_INFO("NeoPattern", String("Received api/neopattern event dataLen=") + String(jsonStr->length())); + JsonDocument doc; + DeserializationError err = deserializeJson(doc, *jsonStr); + if (err) { + LOG_WARN("NeoPattern", String("Failed to parse CLUSTER_EVENT data: ") + err.c_str()); + return; + } + JsonObject obj = doc.as(); + bool applied = applyControlParams(obj); + if (applied) { + LOG_INFO("NeoPattern", "Applied control from CLUSTER_EVENT"); + } + }); +} + +bool NeoPatternService::applyControlParams(const JsonObject& obj) { + bool updated = false; + if (obj["pattern"].is() || obj["pattern"].is()) { + String name = obj["pattern"].as(); + if (isValidPattern(name)) { + setPatternByName(name); + updated = true; + } + } + if (obj["color"].is() || obj["color"].is() || obj["color"].is() || obj["color"].is()) { + String colorStr; + if (obj["color"].is() || obj["color"].is()) { + colorStr = String(obj["color"].as()); + } else { + colorStr = obj["color"].as(); + } + uint32_t color = parseColor(colorStr); + setColor(color); + updated = true; + } + if (obj["color2"].is() || obj["color2"].is() || obj["color2"].is() || obj["color2"].is()) { + String colorStr; + if (obj["color2"].is() || obj["color2"].is()) { + colorStr = String(obj["color2"].as()); + } else { + colorStr = obj["color2"].as(); + } + uint32_t color = parseColor(colorStr); + setColor2(color); + updated = true; + } + if (obj["brightness"].is() || obj["brightness"].is() || obj["brightness"].is() || obj["brightness"].is()) { + int b = 0; + if (obj["brightness"].is() || obj["brightness"].is()) { + b = obj["brightness"].as(); + } else { + b = String(obj["brightness"].as()).toInt(); + } + if (b < 0) { + b = 0; + } + if (b > 255) { + b = 255; + } + setBrightness(static_cast(b)); + updated = true; + } + if (obj["total_steps"].is() || obj["total_steps"].is() || obj["total_steps"].is() || obj["total_steps"].is()) { + int steps = 0; + if (obj["total_steps"].is() || obj["total_steps"].is()) { + steps = obj["total_steps"].as(); + } else { + steps = String(obj["total_steps"].as()).toInt(); + } + if (steps > 0) { setTotalSteps(static_cast(steps)); updated = true; } + } + if (obj["direction"].is() || obj["direction"].is()) { + String dirStr = obj["direction"].as(); + NeoDirection dir = (dirStr.equalsIgnoreCase("reverse")) ? NeoDirection::REVERSE : NeoDirection::FORWARD; + setDirection(dir); + updated = true; + } + if (obj["interval"].is() || obj["interval"].is() || obj["interval"].is() || obj["interval"].is()) { + unsigned long interval = 0; + if (obj["interval"].is() || obj["interval"].is()) { + interval = obj["interval"].as(); + } else { + interval = String(obj["interval"].as()).toInt(); + } + if (interval > 0) { setUpdateInterval(interval); updated = true; } + } + return updated; +} void NeoPatternService::handleStateRequest(AsyncWebServerRequest* request) { if (request->contentType() != "application/json") { diff --git a/examples/neopattern/NeoPatternService.h b/examples/neopattern/NeoPatternService.h index 3103911..b4e1a1b 100644 --- a/examples/neopattern/NeoPatternService.h +++ b/examples/neopattern/NeoPatternService.h @@ -1,6 +1,7 @@ #pragma once #include "spore/Service.h" #include "spore/core/TaskManager.h" +#include "spore/core/NodeContext.h" #include "NeoPattern.h" #include "NeoPatternState.h" #include "NeoPixelConfig.h" @@ -25,7 +26,7 @@ public: REVERSE }; - NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config); + NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config); ~NeoPatternService(); void registerEndpoints(ApiServer& api) override; @@ -49,6 +50,8 @@ private: void registerTasks(); void registerPatterns(); void update(); + void registerEventHandlers(); + bool applyControlParams(const JsonObject& obj); // Pattern updaters void updateRainbowCycle(); @@ -80,6 +83,7 @@ private: String getPatternDescription(const String& name) const; TaskManager& taskManager; + NodeContext& ctx; NeoPattern* neoPattern; NeoPixelConfig config; NeoPatternState currentState; diff --git a/examples/neopattern/main.cpp b/examples/neopattern/main.cpp index 09745eb..2aed7d4 100644 --- a/examples/neopattern/main.cpp +++ b/examples/neopattern/main.cpp @@ -45,7 +45,7 @@ void setup() { ); // Create and add custom service - neoPatternService = new NeoPatternService(spore.getTaskManager(), config); + neoPatternService = new NeoPatternService(spore.getContext(), spore.getTaskManager(), config); spore.addService(neoPatternService); // Start the API server and complete initialization diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index cd994f0..ca4095c 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -39,10 +39,12 @@ private: static bool isHeartbeatMsg(const char* msg); static bool isResponseMsg(const char* msg); static bool isNodeInfoMsg(const char* msg); + static bool isClusterEventMsg(const char* msg); void onDiscovery(const char* msg); void onHeartbeat(const char* msg); void onResponse(const char* msg); void onNodeInfo(const char* msg); + void onClusterEvent(const char* msg); unsigned long lastHeartbeatSentAt = 0; std::vector messageHandlers; }; diff --git a/include/spore/internal/Globals.h b/include/spore/internal/Globals.h index c43d911..690f2e8 100644 --- a/include/spore/internal/Globals.h +++ b/include/spore/internal/Globals.h @@ -9,6 +9,7 @@ namespace ClusterProtocol { constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE"; constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT"; constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO"; + constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT"; constexpr uint16_t UDP_PORT = 4210; // Increased buffer to accommodate node info JSON over UDP constexpr size_t UDP_BUF_SIZE = 512; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 4746488..0d84378 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -41,7 +41,11 @@ void ClusterManager::listen() { if (len <= 0) { return; } - incoming[len] = 0; + if (len >= (int)ClusterProtocol::UDP_BUF_SIZE) { + incoming[ClusterProtocol::UDP_BUF_SIZE - 1] = 0; + } else { + incoming[len] = 0; + } handleIncomingMessage(incoming); } @@ -51,6 +55,7 @@ void ClusterManager::initMessageHandlers() { messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" }); messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" }); messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" }); + messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" }); } void ClusterManager::handleIncomingMessage(const char* incoming) { @@ -60,6 +65,15 @@ void ClusterManager::handleIncomingMessage(const char* incoming) { return; } } + // Unknown message - log first token + const char* colon = strchr(incoming, ':'); + String head; + if (colon) { + head = String(incoming).substring(0, colon - incoming); + } else { + head = String(incoming); + } + LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head); } bool ClusterManager::isDiscoveryMsg(const char* msg) { @@ -78,6 +92,10 @@ bool ClusterManager::isNodeInfoMsg(const char* msg) { return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0; } +bool ClusterManager::isClusterEventMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::CLUSTER_EVENT_MSG, strlen(ClusterProtocol::CLUSTER_EVENT_MSG)) == 0; +} + void ClusterManager::onDiscovery(const char* /*msg*/) { ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; @@ -174,6 +192,50 @@ void ClusterManager::onNodeInfo(const char* msg) { } } +void ClusterManager::onClusterEvent(const char* msg) { + // Message format: CLUSTER_EVENT:{"event":"...","data":""} + const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':' + if (*jsonStart == '\0') { + LOG_DEBUG("Cluster", "CLUSTER_EVENT received with empty payload"); + return; + } + LOG_INFO("Cluster", String("CLUSTER_EVENT raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart))); + JsonDocument doc; + DeserializationError err = deserializeJson(doc, jsonStart); + if (err) { + LOG_DEBUG("Cluster", String("Failed to parse CLUSTER_EVENT JSON from ") + ctx.udp->remoteIP().toString()); + return; + } + // Robust extraction of event and data + String eventStr; + if (doc["event"].is()) { + eventStr = doc["event"].as(); + } else if (doc["event"].is()) { + eventStr = doc["event"].as(); + } + + String data; + if (doc["data"].is()) { + data = doc["data"].as(); + } else if (doc["data"].is()) { + // If data is a nested JSON object/array, serialize it back to string + String tmp; + serializeJson(doc["data"], tmp); + data = tmp; + } + + if (eventStr.length() == 0 || data.length() == 0) { + String dbg; + serializeJson(doc, dbg); + LOG_WARN("Cluster", String("CLUSTER_EVENT missing 'event' or 'data' | payload=") + dbg); + return; + } + + std::string eventKey(eventStr.c_str()); + LOG_INFO("Cluster", String("Firing event '") + eventStr + "' with dataLen=" + String(data.length())); + ctx.fire(eventKey, &data); +} + void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { auto& memberList = *ctx.memberList;