feature/cluster-message #9
@@ -1,10 +1,13 @@
|
|||||||
#include "NeoPatternService.h"
|
#include "NeoPatternService.h"
|
||||||
#include "spore/core/ApiServer.h"
|
#include "spore/core/ApiServer.h"
|
||||||
#include "spore/util/Logging.h"
|
#include "spore/util/Logging.h"
|
||||||
|
#include "spore/internal/Globals.h"
|
||||||
#include <ArduinoJson.h>
|
#include <ArduinoJson.h>
|
||||||
|
#include <ESP8266WiFi.h>
|
||||||
|
|
||||||
NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config)
|
NeoPatternService::NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config)
|
||||||
: taskManager(taskMgr),
|
: taskManager(taskMgr),
|
||||||
|
ctx(ctx),
|
||||||
config(config),
|
config(config),
|
||||||
activePattern(NeoPatternType::RAINBOW_CYCLE),
|
activePattern(NeoPatternType::RAINBOW_CYCLE),
|
||||||
direction(NeoDirection::FORWARD),
|
direction(NeoDirection::FORWARD),
|
||||||
@@ -32,6 +35,7 @@ NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig&
|
|||||||
|
|
||||||
registerPatterns();
|
registerPatterns();
|
||||||
registerTasks();
|
registerTasks();
|
||||||
|
registerEventHandlers();
|
||||||
initialized = true;
|
initialized = true;
|
||||||
|
|
||||||
LOG_INFO("NeoPattern", "Service initialized");
|
LOG_INFO("NeoPattern", "Service initialized");
|
||||||
@@ -64,7 +68,8 @@ void NeoPatternService::registerEndpoints(ApiServer& api) {
|
|||||||
ParamSpec{String("brightness"), false, String("body"), String("numberRange"), {}, String("80")},
|
ParamSpec{String("brightness"), false, String("body"), String("numberRange"), {}, String("80")},
|
||||||
ParamSpec{String("total_steps"), false, String("body"), String("numberRange"), {}, String("16")},
|
ParamSpec{String("total_steps"), false, String("body"), String("numberRange"), {}, String("16")},
|
||||||
ParamSpec{String("direction"), false, String("body"), String("string"), {String("forward"), String("reverse")}},
|
ParamSpec{String("direction"), false, String("body"), String("string"), {String("forward"), String("reverse")}},
|
||||||
ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")}
|
ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")},
|
||||||
|
ParamSpec{String("broadcast"), false, String("body"), String("boolean"), {}}
|
||||||
});
|
});
|
||||||
|
|
||||||
// State endpoint for complex state updates
|
// State endpoint for complex state updates
|
||||||
@@ -119,61 +124,49 @@ void NeoPatternService::handlePatternsRequest(AsyncWebServerRequest* request) {
|
|||||||
|
|
||||||
void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) {
|
void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) {
|
||||||
bool updated = false;
|
bool updated = false;
|
||||||
|
bool broadcast = false;
|
||||||
|
|
||||||
if (request->hasParam("pattern", true)) {
|
if (request->hasParam("broadcast", true)) {
|
||||||
String name = request->getParam("pattern", true)->value();
|
String b = request->getParam("broadcast", true)->value();
|
||||||
if (isValidPattern(name)) {
|
broadcast = b.equalsIgnoreCase("true") || b == "1";
|
||||||
setPatternByName(name);
|
|
||||||
updated = true;
|
|
||||||
} else {
|
|
||||||
// Invalid pattern name - could add error handling here
|
|
||||||
LOG_WARN("NeoPattern", "Invalid pattern name: " + name);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (request->hasParam("color", true)) {
|
// Build JSON payload from provided params (single source of truth)
|
||||||
String colorStr = request->getParam("color", true)->value();
|
JsonDocument payload;
|
||||||
uint32_t color = parseColor(colorStr);
|
bool any = false;
|
||||||
setColor(color);
|
if (request->hasParam("pattern", true)) { payload["pattern"] = request->getParam("pattern", true)->value(); any = true; }
|
||||||
|
if (request->hasParam("color", true)) { payload["color"] = request->getParam("color", true)->value(); any = true; }
|
||||||
|
if (request->hasParam("color2", true)) { payload["color2"] = request->getParam("color2", true)->value(); any = true; }
|
||||||
|
if (request->hasParam("brightness", true)) { payload["brightness"] = request->getParam("brightness", true)->value(); any = true; }
|
||||||
|
if (request->hasParam("total_steps", true)) { payload["total_steps"] = request->getParam("total_steps", true)->value(); any = true; }
|
||||||
|
if (request->hasParam("direction", true)) { payload["direction"] = request->getParam("direction", true)->value(); any = true; }
|
||||||
|
if (request->hasParam("interval", true)) { payload["interval"] = request->getParam("interval", true)->value(); any = true; }
|
||||||
|
|
||||||
|
String payloadStr;
|
||||||
|
serializeJson(payload, payloadStr);
|
||||||
|
|
||||||
|
// Always apply locally via event so we have a single codepath for updates
|
||||||
|
if (any) {
|
||||||
|
std::string ev = "api/neopattern";
|
||||||
|
String localData = payloadStr;
|
||||||
|
LOG_INFO("NeoPattern", String("Applying local api/neopattern via event payloadLen=") + String(payloadStr.length()));
|
||||||
|
ctx.fire(ev, &localData);
|
||||||
updated = true;
|
updated = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (request->hasParam("color2", true)) {
|
// Broadcast to peers if requested (delegate to core broadcast handler)
|
||||||
String colorStr = request->getParam("color2", true)->value();
|
if (broadcast && any) {
|
||||||
uint32_t color = parseColor(colorStr);
|
JsonDocument eventDoc;
|
||||||
setColor2(color);
|
eventDoc["event"] = "api/neopattern";
|
||||||
updated = true;
|
eventDoc["data"] = payloadStr; // data is JSON string
|
||||||
}
|
|
||||||
|
|
||||||
if (request->hasParam("brightness", true)) {
|
String eventJson;
|
||||||
int b = request->getParam("brightness", true)->value().toInt();
|
serializeJson(eventDoc, eventJson);
|
||||||
if (b < 0) b = 0;
|
|
||||||
if (b > 255) b = 255;
|
|
||||||
setBrightness(static_cast<uint8_t>(b));
|
|
||||||
updated = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (request->hasParam("total_steps", true)) {
|
LOG_INFO("NeoPattern", String("Submitting cluster/broadcast for api/neopattern payloadLen=") + String(payloadStr.length()));
|
||||||
int steps = request->getParam("total_steps", true)->value().toInt();
|
std::string ev = "cluster/broadcast";
|
||||||
if (steps > 0) {
|
String eventStr = eventJson;
|
||||||
setTotalSteps(static_cast<uint16_t>(steps));
|
ctx.fire(ev, &eventStr);
|
||||||
updated = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (request->hasParam("direction", true)) {
|
|
||||||
String dirStr = request->getParam("direction", true)->value();
|
|
||||||
NeoDirection dir = (dirStr.equalsIgnoreCase("reverse")) ? NeoDirection::REVERSE : NeoDirection::FORWARD;
|
|
||||||
setDirection(dir);
|
|
||||||
updated = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (request->hasParam("interval", true)) {
|
|
||||||
unsigned long interval = request->getParam("interval", true)->value().toInt();
|
|
||||||
if (interval > 0) {
|
|
||||||
setUpdateInterval(interval);
|
|
||||||
updated = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return current state
|
// Return current state
|
||||||
@@ -192,6 +185,101 @@ void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) {
|
|||||||
serializeJson(resp, json);
|
serializeJson(resp, json);
|
||||||
request->send(200, "application/json", json);
|
request->send(200, "application/json", json);
|
||||||
}
|
}
|
||||||
|
void NeoPatternService::registerEventHandlers() {
|
||||||
|
ctx.on("api/neopattern", [this](void* dataPtr) {
|
||||||
|
String* jsonStr = static_cast<String*>(dataPtr);
|
||||||
|
if (!jsonStr) {
|
||||||
|
LOG_WARN("NeoPattern", "Received api/neopattern with null dataPtr");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG_INFO("NeoPattern", String("Received api/neopattern event dataLen=") + String(jsonStr->length()));
|
||||||
|
JsonDocument doc;
|
||||||
|
DeserializationError err = deserializeJson(doc, *jsonStr);
|
||||||
|
if (err) {
|
||||||
|
LOG_WARN("NeoPattern", String("Failed to parse CLUSTER_EVENT data: ") + err.c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
JsonObject obj = doc.as<JsonObject>();
|
||||||
|
bool applied = applyControlParams(obj);
|
||||||
|
if (applied) {
|
||||||
|
LOG_INFO("NeoPattern", "Applied control from CLUSTER_EVENT");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
bool NeoPatternService::applyControlParams(const JsonObject& obj) {
|
||||||
|
bool updated = false;
|
||||||
|
if (obj["pattern"].is<const char*>() || obj["pattern"].is<String>()) {
|
||||||
|
String name = obj["pattern"].as<String>();
|
||||||
|
if (isValidPattern(name)) {
|
||||||
|
setPatternByName(name);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (obj["color"].is<const char*>() || obj["color"].is<String>() || obj["color"].is<long>() || obj["color"].is<int>()) {
|
||||||
|
String colorStr;
|
||||||
|
if (obj["color"].is<long>() || obj["color"].is<int>()) {
|
||||||
|
colorStr = String(obj["color"].as<long>());
|
||||||
|
} else {
|
||||||
|
colorStr = obj["color"].as<String>();
|
||||||
|
}
|
||||||
|
uint32_t color = parseColor(colorStr);
|
||||||
|
setColor(color);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
if (obj["color2"].is<const char*>() || obj["color2"].is<String>() || obj["color2"].is<long>() || obj["color2"].is<int>()) {
|
||||||
|
String colorStr;
|
||||||
|
if (obj["color2"].is<long>() || obj["color2"].is<int>()) {
|
||||||
|
colorStr = String(obj["color2"].as<long>());
|
||||||
|
} else {
|
||||||
|
colorStr = obj["color2"].as<String>();
|
||||||
|
}
|
||||||
|
uint32_t color = parseColor(colorStr);
|
||||||
|
setColor2(color);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
if (obj["brightness"].is<int>() || obj["brightness"].is<long>() || obj["brightness"].is<const char*>() || obj["brightness"].is<String>()) {
|
||||||
|
int b = 0;
|
||||||
|
if (obj["brightness"].is<int>() || obj["brightness"].is<long>()) {
|
||||||
|
b = obj["brightness"].as<int>();
|
||||||
|
} else {
|
||||||
|
b = String(obj["brightness"].as<String>()).toInt();
|
||||||
|
}
|
||||||
|
if (b < 0) {
|
||||||
|
b = 0;
|
||||||
|
}
|
||||||
|
if (b > 255) {
|
||||||
|
b = 255;
|
||||||
|
}
|
||||||
|
setBrightness(static_cast<uint8_t>(b));
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
if (obj["total_steps"].is<int>() || obj["total_steps"].is<long>() || obj["total_steps"].is<const char*>() || obj["total_steps"].is<String>()) {
|
||||||
|
int steps = 0;
|
||||||
|
if (obj["total_steps"].is<int>() || obj["total_steps"].is<long>()) {
|
||||||
|
steps = obj["total_steps"].as<int>();
|
||||||
|
} else {
|
||||||
|
steps = String(obj["total_steps"].as<String>()).toInt();
|
||||||
|
}
|
||||||
|
if (steps > 0) { setTotalSteps(static_cast<uint16_t>(steps)); updated = true; }
|
||||||
|
}
|
||||||
|
if (obj["direction"].is<const char*>() || obj["direction"].is<String>()) {
|
||||||
|
String dirStr = obj["direction"].as<String>();
|
||||||
|
NeoDirection dir = (dirStr.equalsIgnoreCase("reverse")) ? NeoDirection::REVERSE : NeoDirection::FORWARD;
|
||||||
|
setDirection(dir);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
if (obj["interval"].is<int>() || obj["interval"].is<long>() || obj["interval"].is<const char*>() || obj["interval"].is<String>()) {
|
||||||
|
unsigned long interval = 0;
|
||||||
|
if (obj["interval"].is<int>() || obj["interval"].is<long>()) {
|
||||||
|
interval = obj["interval"].as<unsigned long>();
|
||||||
|
} else {
|
||||||
|
interval = String(obj["interval"].as<String>()).toInt();
|
||||||
|
}
|
||||||
|
if (interval > 0) { setUpdateInterval(interval); updated = true; }
|
||||||
|
}
|
||||||
|
return updated;
|
||||||
|
}
|
||||||
|
|
||||||
void NeoPatternService::handleStateRequest(AsyncWebServerRequest* request) {
|
void NeoPatternService::handleStateRequest(AsyncWebServerRequest* request) {
|
||||||
if (request->contentType() != "application/json") {
|
if (request->contentType() != "application/json") {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
#include "spore/Service.h"
|
#include "spore/Service.h"
|
||||||
#include "spore/core/TaskManager.h"
|
#include "spore/core/TaskManager.h"
|
||||||
|
#include "spore/core/NodeContext.h"
|
||||||
#include "NeoPattern.h"
|
#include "NeoPattern.h"
|
||||||
#include "NeoPatternState.h"
|
#include "NeoPatternState.h"
|
||||||
#include "NeoPixelConfig.h"
|
#include "NeoPixelConfig.h"
|
||||||
@@ -25,7 +26,7 @@ public:
|
|||||||
REVERSE
|
REVERSE
|
||||||
};
|
};
|
||||||
|
|
||||||
NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config);
|
NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config);
|
||||||
~NeoPatternService();
|
~NeoPatternService();
|
||||||
|
|
||||||
void registerEndpoints(ApiServer& api) override;
|
void registerEndpoints(ApiServer& api) override;
|
||||||
@@ -49,6 +50,8 @@ private:
|
|||||||
void registerTasks();
|
void registerTasks();
|
||||||
void registerPatterns();
|
void registerPatterns();
|
||||||
void update();
|
void update();
|
||||||
|
void registerEventHandlers();
|
||||||
|
bool applyControlParams(const JsonObject& obj);
|
||||||
|
|
||||||
// Pattern updaters
|
// Pattern updaters
|
||||||
void updateRainbowCycle();
|
void updateRainbowCycle();
|
||||||
@@ -80,6 +83,7 @@ private:
|
|||||||
String getPatternDescription(const String& name) const;
|
String getPatternDescription(const String& name) const;
|
||||||
|
|
||||||
TaskManager& taskManager;
|
TaskManager& taskManager;
|
||||||
|
NodeContext& ctx;
|
||||||
NeoPattern* neoPattern;
|
NeoPattern* neoPattern;
|
||||||
NeoPixelConfig config;
|
NeoPixelConfig config;
|
||||||
NeoPatternState currentState;
|
NeoPatternState currentState;
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ void setup() {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Create and add custom service
|
// Create and add custom service
|
||||||
neoPatternService = new NeoPatternService(spore.getTaskManager(), config);
|
neoPatternService = new NeoPatternService(spore.getContext(), spore.getTaskManager(), config);
|
||||||
spore.addService(neoPatternService);
|
spore.addService(neoPatternService);
|
||||||
|
|
||||||
// Start the API server and complete initialization
|
// Start the API server and complete initialization
|
||||||
|
|||||||
@@ -39,10 +39,12 @@ private:
|
|||||||
static bool isHeartbeatMsg(const char* msg);
|
static bool isHeartbeatMsg(const char* msg);
|
||||||
static bool isResponseMsg(const char* msg);
|
static bool isResponseMsg(const char* msg);
|
||||||
static bool isNodeInfoMsg(const char* msg);
|
static bool isNodeInfoMsg(const char* msg);
|
||||||
|
static bool isClusterEventMsg(const char* msg);
|
||||||
void onDiscovery(const char* msg);
|
void onDiscovery(const char* msg);
|
||||||
void onHeartbeat(const char* msg);
|
void onHeartbeat(const char* msg);
|
||||||
void onResponse(const char* msg);
|
void onResponse(const char* msg);
|
||||||
void onNodeInfo(const char* msg);
|
void onNodeInfo(const char* msg);
|
||||||
|
void onClusterEvent(const char* msg);
|
||||||
unsigned long lastHeartbeatSentAt = 0;
|
unsigned long lastHeartbeatSentAt = 0;
|
||||||
std::vector<MessageHandler> messageHandlers;
|
std::vector<MessageHandler> messageHandlers;
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ namespace ClusterProtocol {
|
|||||||
constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE";
|
constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE";
|
||||||
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT";
|
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT";
|
||||||
constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO";
|
constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO";
|
||||||
|
constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT";
|
||||||
constexpr uint16_t UDP_PORT = 4210;
|
constexpr uint16_t UDP_PORT = 4210;
|
||||||
// Increased buffer to accommodate node info JSON over UDP
|
// Increased buffer to accommodate node info JSON over UDP
|
||||||
constexpr size_t UDP_BUF_SIZE = 512;
|
constexpr size_t UDP_BUF_SIZE = 512;
|
||||||
|
|||||||
@@ -8,6 +8,23 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
|
|||||||
NodeInfo* node = static_cast<NodeInfo*>(data);
|
NodeInfo* node = static_cast<NodeInfo*>(data);
|
||||||
this->addOrUpdateNode(node->hostname, node->ip);
|
this->addOrUpdateNode(node->hostname, node->ip);
|
||||||
});
|
});
|
||||||
|
// Centralized broadcast handler: services fire 'cluster/broadcast' with CLUSTER_EVENT JSON payload
|
||||||
|
ctx.on("cluster/broadcast", [this](void* data) {
|
||||||
|
String* jsonStr = static_cast<String*>(data);
|
||||||
|
if (!jsonStr) {
|
||||||
|
LOG_WARN("Cluster", "cluster/broadcast called with null data");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Subnet-directed broadcast (more reliable than 255.255.255.255 on some networks)
|
||||||
|
IPAddress ip = WiFi.localIP();
|
||||||
|
IPAddress mask = WiFi.subnetMask();
|
||||||
|
IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]);
|
||||||
|
LOG_INFO("Cluster", String("Broadcasting CLUSTER_EVENT to ") + bcast.toString() + " len=" + String(jsonStr->length()));
|
||||||
|
this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port);
|
||||||
|
String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr;
|
||||||
|
this->ctx.udp->write(msg.c_str());
|
||||||
|
this->ctx.udp->endPacket();
|
||||||
|
});
|
||||||
// Register tasks
|
// Register tasks
|
||||||
registerTasks();
|
registerTasks();
|
||||||
initMessageHandlers();
|
initMessageHandlers();
|
||||||
@@ -41,7 +58,11 @@ void ClusterManager::listen() {
|
|||||||
if (len <= 0) {
|
if (len <= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
incoming[len] = 0;
|
if (len >= (int)ClusterProtocol::UDP_BUF_SIZE) {
|
||||||
|
incoming[ClusterProtocol::UDP_BUF_SIZE - 1] = 0;
|
||||||
|
} else {
|
||||||
|
incoming[len] = 0;
|
||||||
|
}
|
||||||
handleIncomingMessage(incoming);
|
handleIncomingMessage(incoming);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,6 +72,7 @@ void ClusterManager::initMessageHandlers() {
|
|||||||
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
|
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
|
||||||
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
|
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
|
||||||
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
|
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
|
||||||
|
messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" });
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::handleIncomingMessage(const char* incoming) {
|
void ClusterManager::handleIncomingMessage(const char* incoming) {
|
||||||
@@ -60,6 +82,15 @@ void ClusterManager::handleIncomingMessage(const char* incoming) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Unknown message - log first token
|
||||||
|
const char* colon = strchr(incoming, ':');
|
||||||
|
String head;
|
||||||
|
if (colon) {
|
||||||
|
head = String(incoming).substring(0, colon - incoming);
|
||||||
|
} else {
|
||||||
|
head = String(incoming);
|
||||||
|
}
|
||||||
|
LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ClusterManager::isDiscoveryMsg(const char* msg) {
|
bool ClusterManager::isDiscoveryMsg(const char* msg) {
|
||||||
@@ -78,6 +109,10 @@ bool ClusterManager::isNodeInfoMsg(const char* msg) {
|
|||||||
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
|
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ClusterManager::isClusterEventMsg(const char* msg) {
|
||||||
|
return strncmp(msg, ClusterProtocol::CLUSTER_EVENT_MSG, strlen(ClusterProtocol::CLUSTER_EVENT_MSG)) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
void ClusterManager::onDiscovery(const char* /*msg*/) {
|
void ClusterManager::onDiscovery(const char* /*msg*/) {
|
||||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||||
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
||||||
@@ -174,6 +209,50 @@ void ClusterManager::onNodeInfo(const char* msg) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ClusterManager::onClusterEvent(const char* msg) {
|
||||||
|
// Message format: CLUSTER_EVENT:{"event":"...","data":"<json string>"}
|
||||||
|
const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':'
|
||||||
|
if (*jsonStart == '\0') {
|
||||||
|
LOG_DEBUG("Cluster", "CLUSTER_EVENT received with empty payload");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG_INFO("Cluster", String("CLUSTER_EVENT raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart)));
|
||||||
|
JsonDocument doc;
|
||||||
|
DeserializationError err = deserializeJson(doc, jsonStart);
|
||||||
|
if (err) {
|
||||||
|
LOG_DEBUG("Cluster", String("Failed to parse CLUSTER_EVENT JSON from ") + ctx.udp->remoteIP().toString());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Robust extraction of event and data
|
||||||
|
String eventStr;
|
||||||
|
if (doc["event"].is<const char*>()) {
|
||||||
|
eventStr = doc["event"].as<const char*>();
|
||||||
|
} else if (doc["event"].is<String>()) {
|
||||||
|
eventStr = doc["event"].as<String>();
|
||||||
|
}
|
||||||
|
|
||||||
|
String data;
|
||||||
|
if (doc["data"].is<const char*>()) {
|
||||||
|
data = doc["data"].as<const char*>();
|
||||||
|
} else if (doc["data"].is<JsonVariantConst>()) {
|
||||||
|
// If data is a nested JSON object/array, serialize it back to string
|
||||||
|
String tmp;
|
||||||
|
serializeJson(doc["data"], tmp);
|
||||||
|
data = tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (eventStr.length() == 0 || data.length() == 0) {
|
||||||
|
String dbg;
|
||||||
|
serializeJson(doc, dbg);
|
||||||
|
LOG_WARN("Cluster", String("CLUSTER_EVENT missing 'event' or 'data' | payload=") + dbg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string eventKey(eventStr.c_str());
|
||||||
|
LOG_INFO("Cluster", String("Firing event '") + eventStr + "' with dataLen=" + String(data.length()));
|
||||||
|
ctx.fire(eventKey, &data);
|
||||||
|
}
|
||||||
|
|
||||||
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||||
auto& memberList = *ctx.memberList;
|
auto& memberList = *ctx.memberList;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user