feat(cluster, neopattern): add CLUSTER_EVENT and broadcast handling
Add CLUSTER_EVENT message type and end-to-end handling across cluster and NeoPattern example.\n\nCluster protocol / core:\n- Add ClusterProtocol::CLUSTER_EVENT_MSG\n- ClusterManager: register predicate/handler for CLUSTER_EVENT\n- Robust parsing: accept data as string or nested JSON; serialize nested data to string before firing\n- Add defensive null-termination for full UDP reads; log unknown message head if no handler matches\n- Logging: source IP, payload length, missing fields, and pre-fire event details\n\nNeoPatternService:\n- Constructor now accepts NodeContext and registers ctx.on(api/neopattern) handler\n- /api/neopattern: add optional boolean 'broadcast' flag\n- If broadcast=true: build event payload and send CLUSTER_EVENT over UDP (subnet-directed broadcast); log target and payload size; also fire locally so sender applies immediately\n- Implement applyControlParams with robust ArduinoJson is<T>() checks (replaces deprecated containsKey()) and flexible string/number parsing for color, brightness, steps, interval\n- Minor fixes: include Globals for ClusterProtocol, include ESP8266WiFi for broadcast IP calc, safer brightness clamping\n\nExample:\n- Update neopattern main to pass NodeContext into NeoPatternService\n\nResult:\n- Nodes receive and process CLUSTER_EVENT (api/neopattern) via ctx.fire/ctx.on\n- Broadcast reliably reaches peers; parsing handles both stringified and nested JSON data\n- Additional logs aid diagnosis of delivery/format issues and remove deprecation warnings
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
#include "NeoPatternService.h"
|
||||
#include "spore/core/ApiServer.h"
|
||||
#include "spore/util/Logging.h"
|
||||
#include "spore/internal/Globals.h"
|
||||
#include <ArduinoJson.h>
|
||||
#include <ESP8266WiFi.h>
|
||||
|
||||
NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config)
|
||||
NeoPatternService::NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config)
|
||||
: taskManager(taskMgr),
|
||||
ctx(ctx),
|
||||
config(config),
|
||||
activePattern(NeoPatternType::RAINBOW_CYCLE),
|
||||
direction(NeoDirection::FORWARD),
|
||||
@@ -32,6 +35,7 @@ NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig&
|
||||
|
||||
registerPatterns();
|
||||
registerTasks();
|
||||
registerEventHandlers();
|
||||
initialized = true;
|
||||
|
||||
LOG_INFO("NeoPattern", "Service initialized");
|
||||
@@ -64,7 +68,8 @@ void NeoPatternService::registerEndpoints(ApiServer& api) {
|
||||
ParamSpec{String("brightness"), false, String("body"), String("numberRange"), {}, String("80")},
|
||||
ParamSpec{String("total_steps"), false, String("body"), String("numberRange"), {}, String("16")},
|
||||
ParamSpec{String("direction"), false, String("body"), String("string"), {String("forward"), String("reverse")}},
|
||||
ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")}
|
||||
ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")},
|
||||
ParamSpec{String("broadcast"), false, String("body"), String("boolean"), {}}
|
||||
});
|
||||
|
||||
// State endpoint for complex state updates
|
||||
@@ -119,6 +124,7 @@ void NeoPatternService::handlePatternsRequest(AsyncWebServerRequest* request) {
|
||||
|
||||
void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) {
|
||||
bool updated = false;
|
||||
bool broadcast = false;
|
||||
|
||||
if (request->hasParam("pattern", true)) {
|
||||
String name = request->getParam("pattern", true)->value();
|
||||
@@ -176,6 +182,48 @@ void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) {
|
||||
}
|
||||
}
|
||||
|
||||
if (request->hasParam("broadcast", true)) {
|
||||
String b = request->getParam("broadcast", true)->value();
|
||||
broadcast = b.equalsIgnoreCase("true") || b == "1";
|
||||
}
|
||||
|
||||
if (broadcast) {
|
||||
// Build JSON payload from provided params
|
||||
JsonDocument payload;
|
||||
if (request->hasParam("pattern", true)) payload["pattern"] = request->getParam("pattern", true)->value();
|
||||
if (request->hasParam("color", true)) payload["color"] = request->getParam("color", true)->value();
|
||||
if (request->hasParam("color2", true)) payload["color2"] = request->getParam("color2", true)->value();
|
||||
if (request->hasParam("brightness", true)) payload["brightness"] = request->getParam("brightness", true)->value();
|
||||
if (request->hasParam("total_steps", true)) payload["total_steps"] = request->getParam("total_steps", true)->value();
|
||||
if (request->hasParam("direction", true)) payload["direction"] = request->getParam("direction", true)->value();
|
||||
if (request->hasParam("interval", true)) payload["interval"] = request->getParam("interval", true)->value();
|
||||
|
||||
String payloadStr;
|
||||
serializeJson(payload, payloadStr);
|
||||
|
||||
JsonDocument eventDoc;
|
||||
eventDoc["event"] = "api/neopattern";
|
||||
eventDoc["data"] = payloadStr; // data is arbitrary JSON string
|
||||
|
||||
String eventJson;
|
||||
serializeJson(eventDoc, eventJson);
|
||||
// Compute subnet-directed broadcast to improve delivery on some networks
|
||||
IPAddress ip = WiFi.localIP();
|
||||
IPAddress mask = WiFi.subnetMask();
|
||||
IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]);
|
||||
LOG_INFO("NeoPattern", String("Broadcasting CLUSTER_EVENT api/neopattern to ") + bcast.toString() + " payloadLen=" + String(payloadStr.length()));
|
||||
ctx.udp->beginPacket(bcast, ctx.config.udp_port);
|
||||
String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + eventJson;
|
||||
ctx.udp->write(msg.c_str());
|
||||
ctx.udp->endPacket();
|
||||
|
||||
// Fire locally as well so the sender applies the change
|
||||
std::string ev = "api/neopattern";
|
||||
String localData = payloadStr;
|
||||
LOG_INFO("NeoPattern", "Firing local api/neopattern event after broadcast");
|
||||
ctx.fire(ev, &localData);
|
||||
}
|
||||
|
||||
// Return current state
|
||||
JsonDocument resp;
|
||||
resp["ok"] = true;
|
||||
@@ -192,6 +240,101 @@ void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) {
|
||||
serializeJson(resp, json);
|
||||
request->send(200, "application/json", json);
|
||||
}
|
||||
void NeoPatternService::registerEventHandlers() {
|
||||
ctx.on("api/neopattern", [this](void* dataPtr) {
|
||||
String* jsonStr = static_cast<String*>(dataPtr);
|
||||
if (!jsonStr) {
|
||||
LOG_WARN("NeoPattern", "Received api/neopattern with null dataPtr");
|
||||
return;
|
||||
}
|
||||
LOG_INFO("NeoPattern", String("Received api/neopattern event dataLen=") + String(jsonStr->length()));
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, *jsonStr);
|
||||
if (err) {
|
||||
LOG_WARN("NeoPattern", String("Failed to parse CLUSTER_EVENT data: ") + err.c_str());
|
||||
return;
|
||||
}
|
||||
JsonObject obj = doc.as<JsonObject>();
|
||||
bool applied = applyControlParams(obj);
|
||||
if (applied) {
|
||||
LOG_INFO("NeoPattern", "Applied control from CLUSTER_EVENT");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bool NeoPatternService::applyControlParams(const JsonObject& obj) {
|
||||
bool updated = false;
|
||||
if (obj["pattern"].is<const char*>() || obj["pattern"].is<String>()) {
|
||||
String name = obj["pattern"].as<String>();
|
||||
if (isValidPattern(name)) {
|
||||
setPatternByName(name);
|
||||
updated = true;
|
||||
}
|
||||
}
|
||||
if (obj["color"].is<const char*>() || obj["color"].is<String>() || obj["color"].is<long>() || obj["color"].is<int>()) {
|
||||
String colorStr;
|
||||
if (obj["color"].is<long>() || obj["color"].is<int>()) {
|
||||
colorStr = String(obj["color"].as<long>());
|
||||
} else {
|
||||
colorStr = obj["color"].as<String>();
|
||||
}
|
||||
uint32_t color = parseColor(colorStr);
|
||||
setColor(color);
|
||||
updated = true;
|
||||
}
|
||||
if (obj["color2"].is<const char*>() || obj["color2"].is<String>() || obj["color2"].is<long>() || obj["color2"].is<int>()) {
|
||||
String colorStr;
|
||||
if (obj["color2"].is<long>() || obj["color2"].is<int>()) {
|
||||
colorStr = String(obj["color2"].as<long>());
|
||||
} else {
|
||||
colorStr = obj["color2"].as<String>();
|
||||
}
|
||||
uint32_t color = parseColor(colorStr);
|
||||
setColor2(color);
|
||||
updated = true;
|
||||
}
|
||||
if (obj["brightness"].is<int>() || obj["brightness"].is<long>() || obj["brightness"].is<const char*>() || obj["brightness"].is<String>()) {
|
||||
int b = 0;
|
||||
if (obj["brightness"].is<int>() || obj["brightness"].is<long>()) {
|
||||
b = obj["brightness"].as<int>();
|
||||
} else {
|
||||
b = String(obj["brightness"].as<String>()).toInt();
|
||||
}
|
||||
if (b < 0) {
|
||||
b = 0;
|
||||
}
|
||||
if (b > 255) {
|
||||
b = 255;
|
||||
}
|
||||
setBrightness(static_cast<uint8_t>(b));
|
||||
updated = true;
|
||||
}
|
||||
if (obj["total_steps"].is<int>() || obj["total_steps"].is<long>() || obj["total_steps"].is<const char*>() || obj["total_steps"].is<String>()) {
|
||||
int steps = 0;
|
||||
if (obj["total_steps"].is<int>() || obj["total_steps"].is<long>()) {
|
||||
steps = obj["total_steps"].as<int>();
|
||||
} else {
|
||||
steps = String(obj["total_steps"].as<String>()).toInt();
|
||||
}
|
||||
if (steps > 0) { setTotalSteps(static_cast<uint16_t>(steps)); updated = true; }
|
||||
}
|
||||
if (obj["direction"].is<const char*>() || obj["direction"].is<String>()) {
|
||||
String dirStr = obj["direction"].as<String>();
|
||||
NeoDirection dir = (dirStr.equalsIgnoreCase("reverse")) ? NeoDirection::REVERSE : NeoDirection::FORWARD;
|
||||
setDirection(dir);
|
||||
updated = true;
|
||||
}
|
||||
if (obj["interval"].is<int>() || obj["interval"].is<long>() || obj["interval"].is<const char*>() || obj["interval"].is<String>()) {
|
||||
unsigned long interval = 0;
|
||||
if (obj["interval"].is<int>() || obj["interval"].is<long>()) {
|
||||
interval = obj["interval"].as<unsigned long>();
|
||||
} else {
|
||||
interval = String(obj["interval"].as<String>()).toInt();
|
||||
}
|
||||
if (interval > 0) { setUpdateInterval(interval); updated = true; }
|
||||
}
|
||||
return updated;
|
||||
}
|
||||
|
||||
void NeoPatternService::handleStateRequest(AsyncWebServerRequest* request) {
|
||||
if (request->contentType() != "application/json") {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
#include "spore/Service.h"
|
||||
#include "spore/core/TaskManager.h"
|
||||
#include "spore/core/NodeContext.h"
|
||||
#include "NeoPattern.h"
|
||||
#include "NeoPatternState.h"
|
||||
#include "NeoPixelConfig.h"
|
||||
@@ -25,7 +26,7 @@ public:
|
||||
REVERSE
|
||||
};
|
||||
|
||||
NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config);
|
||||
NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config);
|
||||
~NeoPatternService();
|
||||
|
||||
void registerEndpoints(ApiServer& api) override;
|
||||
@@ -49,6 +50,8 @@ private:
|
||||
void registerTasks();
|
||||
void registerPatterns();
|
||||
void update();
|
||||
void registerEventHandlers();
|
||||
bool applyControlParams(const JsonObject& obj);
|
||||
|
||||
// Pattern updaters
|
||||
void updateRainbowCycle();
|
||||
@@ -80,6 +83,7 @@ private:
|
||||
String getPatternDescription(const String& name) const;
|
||||
|
||||
TaskManager& taskManager;
|
||||
NodeContext& ctx;
|
||||
NeoPattern* neoPattern;
|
||||
NeoPixelConfig config;
|
||||
NeoPatternState currentState;
|
||||
|
||||
@@ -45,7 +45,7 @@ void setup() {
|
||||
);
|
||||
|
||||
// Create and add custom service
|
||||
neoPatternService = new NeoPatternService(spore.getTaskManager(), config);
|
||||
neoPatternService = new NeoPatternService(spore.getContext(), spore.getTaskManager(), config);
|
||||
spore.addService(neoPatternService);
|
||||
|
||||
// Start the API server and complete initialization
|
||||
|
||||
@@ -39,10 +39,12 @@ private:
|
||||
static bool isHeartbeatMsg(const char* msg);
|
||||
static bool isResponseMsg(const char* msg);
|
||||
static bool isNodeInfoMsg(const char* msg);
|
||||
static bool isClusterEventMsg(const char* msg);
|
||||
void onDiscovery(const char* msg);
|
||||
void onHeartbeat(const char* msg);
|
||||
void onResponse(const char* msg);
|
||||
void onNodeInfo(const char* msg);
|
||||
void onClusterEvent(const char* msg);
|
||||
unsigned long lastHeartbeatSentAt = 0;
|
||||
std::vector<MessageHandler> messageHandlers;
|
||||
};
|
||||
|
||||
@@ -9,6 +9,7 @@ namespace ClusterProtocol {
|
||||
constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE";
|
||||
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT";
|
||||
constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO";
|
||||
constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT";
|
||||
constexpr uint16_t UDP_PORT = 4210;
|
||||
// Increased buffer to accommodate node info JSON over UDP
|
||||
constexpr size_t UDP_BUF_SIZE = 512;
|
||||
|
||||
@@ -41,7 +41,11 @@ void ClusterManager::listen() {
|
||||
if (len <= 0) {
|
||||
return;
|
||||
}
|
||||
incoming[len] = 0;
|
||||
if (len >= (int)ClusterProtocol::UDP_BUF_SIZE) {
|
||||
incoming[ClusterProtocol::UDP_BUF_SIZE - 1] = 0;
|
||||
} else {
|
||||
incoming[len] = 0;
|
||||
}
|
||||
handleIncomingMessage(incoming);
|
||||
}
|
||||
|
||||
@@ -51,6 +55,7 @@ void ClusterManager::initMessageHandlers() {
|
||||
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
|
||||
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
|
||||
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
|
||||
messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" });
|
||||
}
|
||||
|
||||
void ClusterManager::handleIncomingMessage(const char* incoming) {
|
||||
@@ -60,6 +65,15 @@ void ClusterManager::handleIncomingMessage(const char* incoming) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Unknown message - log first token
|
||||
const char* colon = strchr(incoming, ':');
|
||||
String head;
|
||||
if (colon) {
|
||||
head = String(incoming).substring(0, colon - incoming);
|
||||
} else {
|
||||
head = String(incoming);
|
||||
}
|
||||
LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head);
|
||||
}
|
||||
|
||||
bool ClusterManager::isDiscoveryMsg(const char* msg) {
|
||||
@@ -78,6 +92,10 @@ bool ClusterManager::isNodeInfoMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isClusterEventMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::CLUSTER_EVENT_MSG, strlen(ClusterProtocol::CLUSTER_EVENT_MSG)) == 0;
|
||||
}
|
||||
|
||||
void ClusterManager::onDiscovery(const char* /*msg*/) {
|
||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
||||
@@ -174,6 +192,50 @@ void ClusterManager::onNodeInfo(const char* msg) {
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::onClusterEvent(const char* msg) {
|
||||
// Message format: CLUSTER_EVENT:{"event":"...","data":"<json string>"}
|
||||
const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':'
|
||||
if (*jsonStart == '\0') {
|
||||
LOG_DEBUG("Cluster", "CLUSTER_EVENT received with empty payload");
|
||||
return;
|
||||
}
|
||||
LOG_INFO("Cluster", String("CLUSTER_EVENT raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart)));
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, jsonStart);
|
||||
if (err) {
|
||||
LOG_DEBUG("Cluster", String("Failed to parse CLUSTER_EVENT JSON from ") + ctx.udp->remoteIP().toString());
|
||||
return;
|
||||
}
|
||||
// Robust extraction of event and data
|
||||
String eventStr;
|
||||
if (doc["event"].is<const char*>()) {
|
||||
eventStr = doc["event"].as<const char*>();
|
||||
} else if (doc["event"].is<String>()) {
|
||||
eventStr = doc["event"].as<String>();
|
||||
}
|
||||
|
||||
String data;
|
||||
if (doc["data"].is<const char*>()) {
|
||||
data = doc["data"].as<const char*>();
|
||||
} else if (doc["data"].is<JsonVariantConst>()) {
|
||||
// If data is a nested JSON object/array, serialize it back to string
|
||||
String tmp;
|
||||
serializeJson(doc["data"], tmp);
|
||||
data = tmp;
|
||||
}
|
||||
|
||||
if (eventStr.length() == 0 || data.length() == 0) {
|
||||
String dbg;
|
||||
serializeJson(doc, dbg);
|
||||
LOG_WARN("Cluster", String("CLUSTER_EVENT missing 'event' or 'data' | payload=") + dbg);
|
||||
return;
|
||||
}
|
||||
|
||||
std::string eventKey(eventStr.c_str());
|
||||
LOG_INFO("Cluster", String("Firing event '") + eventStr + "' with dataLen=" + String(data.length()));
|
||||
ctx.fire(eventKey, &data);
|
||||
}
|
||||
|
||||
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user