From 3ed44cd00f9c17c7091a1d3d896c9c47de4b7796 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Sun, 19 Oct 2025 12:50:43 +0200 Subject: [PATCH 1/5] feat: improve cluster forming; just use heartbeat to form the cluster --- ctl.sh | 3 +- docs/Architecture.md | 47 +++--- docs/ConfigurationManagement.md | 3 +- include/spore/core/ClusterManager.h | 11 +- include/spore/internal/Globals.h | 8 +- include/spore/types/Config.h | 6 +- include/spore/types/NodeInfo.h | 3 +- src/spore/core/ClusterManager.cpp | 236 ++++++++++++++++++---------- src/spore/services/NodeService.cpp | 3 +- src/spore/types/Config.cpp | 9 +- 10 files changed, 185 insertions(+), 144 deletions(-) diff --git a/ctl.sh b/ctl.sh index 235c90c..99e2f50 100755 --- a/ctl.sh +++ b/ctl.sh @@ -372,11 +372,10 @@ function node { # Cluster Configuration echo "=== Cluster Configuration ===" - echo "Discovery Interval: $(echo "$response_body" | jq -r '.cluster.discovery_interval_ms // "N/A"') ms" echo "Heartbeat Interval: $(echo "$response_body" | jq -r '.cluster.heartbeat_interval_ms // "N/A"') ms" echo "Cluster Listen Interval: $(echo "$response_body" | jq -r '.cluster.cluster_listen_interval_ms // "N/A"') ms" echo "Status Update Interval: $(echo "$response_body" | jq -r '.cluster.status_update_interval_ms // "N/A"') ms" - echo "Member Info Update Interval: $(echo "$response_body" | jq -r '.cluster.member_info_update_interval_ms // "N/A"') ms" + echo "Node Update Broadcast Interval: $(echo "$response_body" | jq -r '.cluster.node_update_broadcast_interval_ms // "N/A"') ms" echo "Print Interval: $(echo "$response_body" | jq -r '.cluster.print_interval_ms // "N/A"') ms" echo "" diff --git a/docs/Architecture.md b/docs/Architecture.md index 655f4fa..095e084 100644 --- a/docs/Architecture.md +++ b/docs/Architecture.md @@ -52,57 +52,50 @@ The cluster uses a UDP-based discovery protocol for automatic node detection: - **UDP Port**: 4210 (configurable via `Config.udp_port`) - **Discovery Message**: `CLUSTER_DISCOVERY` - **Response Message**: `CLUSTER_RESPONSE` -- **Heartbeat Message**: `CLUSTER_HEARTBEAT` -- **Node Info Message**: `CLUSTER_NODE_INFO::` +- **Heartbeat Message**: `CLUSTER_HEARTBEAT:hostname` +- **Node Update Message**: `NODE_UPDATE:hostname:{json}` - **Broadcast Address**: 255.255.255.255 -- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms) - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) +- **Node Update Broadcast Interval**: `Config.node_update_broadcast_interval_ms` (default 5000 ms) ### Message Formats -- **Discovery**: `CLUSTER_DISCOVERY` - - Sender: any node, broadcast to 255.255.255.255:`udp_port` - - Purpose: announce presence and solicit peer identification -- **Response**: `CLUSTER_RESPONSE:` - - Sender: node receiving a discovery; unicast to requester IP - - Purpose: provide hostname so requester can register/update member -- **Heartbeat**: `CLUSTER_HEARTBEAT:` +- **Heartbeat**: `CLUSTER_HEARTBEAT:hostname` - Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval - - Purpose: prompt peers to reply with their node info and keep liveness -- **Node Info**: `CLUSTER_NODE_INFO::` + - Purpose: announce presence, prompt peers for node info, and keep liveness +- **Node Update**: `NODE_UPDATE:hostname:{json}` - Sender: node receiving a heartbeat; unicast to heartbeat sender IP - - JSON fields: freeHeap, chipId, sdkVersion, cpuFreqMHz, flashChipSize, optional labels - -### Discovery Flow - -1. **Sender broadcasts** `CLUSTER_DISCOVERY` -2. **Each receiver responds** with `CLUSTER_RESPONSE:` to the sender IP -3. **Sender registers/updates** the node using hostname and source IP + - JSON fields: hostname, ip, uptime, optional labels + - Purpose: provide minimal node information in response to heartbeat ### Heartbeat Flow -1. **A node broadcasts** `CLUSTER_HEARTBEAT:` -2. **Each receiver replies** with `CLUSTER_NODE_INFO::` to the heartbeat sender IP +1. **A node broadcasts** `CLUSTER_HEARTBEAT:hostname` +2. **Each receiver responds** with `NODE_UPDATE:hostname:{json}` to the heartbeat sender IP 3. **The sender**: - Ensures the node exists or creates it with `hostname` and sender IP - - Parses JSON and updates resources, labels, `status = ACTIVE`, `lastSeen = now` + - Parses JSON and updates node info, `status = ACTIVE`, `lastSeen = now` - Sets `latency = now - lastHeartbeatSentAt` (per-node, measured at heartbeat origin) +### Node Update Broadcasting + +1. **Periodic broadcast**: Each node broadcasts `NODE_UPDATE:hostname:{json}` every 5 seconds +2. **All receivers**: Update their memberlist entry for the broadcasting node +3. **Purpose**: Ensures all nodes have current information about each other + ### Listener Behavior The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to: -- **Discovery** → send `CLUSTER_RESPONSE` -- **Heartbeat** → send `CLUSTER_NODE_INFO` JSON -- **Response** → add/update node using provided hostname and source IP -- **Node Info** → update resources/status/labels and record latency +- **Heartbeat** → add/update node and send `NODE_UPDATE` JSON response +- **Node Update** → update node information and status ### Timing and Intervals - **UDP Port**: `Config.udp_port` (default 4210) -- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms) - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) +- **Node Update Broadcast Interval**: `Config.node_update_broadcast_interval_ms` (default 5000 ms) ### Node Status Categories diff --git a/docs/ConfigurationManagement.md b/docs/ConfigurationManagement.md index 50bf912..7561121 100644 --- a/docs/ConfigurationManagement.md +++ b/docs/ConfigurationManagement.md @@ -93,11 +93,10 @@ The configuration is stored as a JSON file with the following structure: "api_server_port": 80 }, "cluster": { - "discovery_interval_ms": 1000, "heartbeat_interval_ms": 5000, "cluster_listen_interval_ms": 10, "status_update_interval_ms": 1000, - "member_info_update_interval_ms": 10000, + "node_update_broadcast_interval_ms": 5000, "print_interval_ms": 5000 }, "thresholds": { diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index c742a92..5c7ff86 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -14,7 +14,6 @@ class ClusterManager { public: ClusterManager(NodeContext& ctx, TaskManager& taskMgr); void registerTasks(); - void sendDiscovery(); void listen(); void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP); void updateAllNodeStatuses(); @@ -25,6 +24,7 @@ public: void updateLocalNodeResources(); void heartbeatTaskCallback(); void updateAllMembersInfoTaskCallback(); + void broadcastNodeUpdate(); private: NodeContext& ctx; TaskManager& taskManager; @@ -35,18 +35,15 @@ private: }; void initMessageHandlers(); void handleIncomingMessage(const char* incoming); - static bool isDiscoveryMsg(const char* msg); static bool isHeartbeatMsg(const char* msg); - static bool isResponseMsg(const char* msg); - static bool isNodeInfoMsg(const char* msg); + static bool isNodeUpdateMsg(const char* msg); static bool isClusterEventMsg(const char* msg); static bool isRawMsg(const char* msg); - void onDiscovery(const char* msg); void onHeartbeat(const char* msg); - void onResponse(const char* msg); - void onNodeInfo(const char* msg); + void onNodeUpdate(const char* msg); void onClusterEvent(const char* msg); void onRawMessage(const char* msg); + void sendNodeInfo(const String& hostname, const IPAddress& targetIP); unsigned long lastHeartbeatSentAt = 0; std::vector messageHandlers; }; diff --git a/include/spore/internal/Globals.h b/include/spore/internal/Globals.h index e2468b4..f07ccf5 100644 --- a/include/spore/internal/Globals.h +++ b/include/spore/internal/Globals.h @@ -5,10 +5,9 @@ // Cluster protocol and API constants namespace ClusterProtocol { - constexpr const char* DISCOVERY_MSG = "CLUSTER_DISCOVERY"; - constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE"; + // Simplified heartbeat-only protocol constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT"; - constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO"; + constexpr const char* NODE_UPDATE_MSG = "NODE_UPDATE"; constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT"; constexpr const char* RAW_MSG = "RAW"; constexpr uint16_t UDP_PORT = 4210; @@ -18,12 +17,9 @@ namespace ClusterProtocol { } namespace TaskIntervals { - constexpr unsigned long SEND_DISCOVERY = 1000; - constexpr unsigned long LISTEN_FOR_DISCOVERY = 100; constexpr unsigned long UPDATE_STATUS = 1000; constexpr unsigned long PRINT_MEMBER_LIST = 5000; constexpr unsigned long HEARTBEAT = 2000; - constexpr unsigned long UPDATE_ALL_MEMBERS_INFO = 10000; } constexpr unsigned long NODE_ACTIVE_THRESHOLD = 10000; diff --git a/include/spore/types/Config.h b/include/spore/types/Config.h index 97d5d28..d24cc2e 100644 --- a/include/spore/types/Config.h +++ b/include/spore/types/Config.h @@ -12,11 +12,10 @@ public: static constexpr const char* DEFAULT_WIFI_PASSWORD = "th3r31sn0sp00n"; static constexpr uint16_t DEFAULT_UDP_PORT = 4210; static constexpr uint16_t DEFAULT_API_SERVER_PORT = 80; - static constexpr unsigned long DEFAULT_DISCOVERY_INTERVAL_MS = 1000; static constexpr unsigned long DEFAULT_CLUSTER_LISTEN_INTERVAL_MS = 10; static constexpr unsigned long DEFAULT_HEARTBEAT_INTERVAL_MS = 5000; static constexpr unsigned long DEFAULT_STATUS_UPDATE_INTERVAL_MS = 1000; - static constexpr unsigned long DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS = 10000; + static constexpr unsigned long DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS = 5000; static constexpr unsigned long DEFAULT_PRINT_INTERVAL_MS = 5000; static constexpr unsigned long DEFAULT_NODE_ACTIVE_THRESHOLD_MS = 10000; static constexpr unsigned long DEFAULT_NODE_INACTIVE_THRESHOLD_MS = 60000; @@ -38,11 +37,10 @@ public: uint16_t api_server_port; // Cluster Configuration - unsigned long discovery_interval_ms; unsigned long heartbeat_interval_ms; unsigned long cluster_listen_interval_ms; unsigned long status_update_interval_ms; - unsigned long member_info_update_interval_ms; + unsigned long node_update_broadcast_interval_ms; unsigned long print_interval_ms; // Node Status Thresholds diff --git a/include/spore/types/NodeInfo.h b/include/spore/types/NodeInfo.h index e6c73cb..04cda91 100644 --- a/include/spore/types/NodeInfo.h +++ b/include/spore/types/NodeInfo.h @@ -9,6 +9,7 @@ struct NodeInfo { String hostname; IPAddress ip; unsigned long lastSeen; + unsigned long uptime = 0; // milliseconds since node started enum Status { ACTIVE, INACTIVE, DEAD } status; struct Resources { uint32_t freeHeap = 0; @@ -17,7 +18,7 @@ struct NodeInfo { uint32_t cpuFreqMHz = 0; uint32_t flashChipSize = 0; } resources; - unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt + unsigned long latency = 0; // ms from heartbeat broadcast to NODE_UPDATE receipt std::vector endpoints; // List of registered endpoints std::map labels; // Arbitrary node labels (key -> value) }; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 2e2bd59..a0a53d7 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -25,27 +25,27 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx this->ctx.udp->write(msg.c_str()); this->ctx.udp->endPacket(); }); + + // Handler for node update broadcasts: services fire 'cluster/node_update' when their node info changes + ctx.on("cluster/node_update", [this](void* data) { + // Trigger immediate NODE_UPDATE broadcast when node info changes + broadcastNodeUpdate(); + }); // Register tasks registerTasks(); initMessageHandlers(); } void ClusterManager::registerTasks() { - taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); - taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); }); + taskManager.registerTask("node_update_broadcast", ctx.config.node_update_broadcast_interval_ms, [this]() { broadcastNodeUpdate(); }); LOG_INFO("ClusterManager", "Registered all cluster tasks"); } -void ClusterManager::sendDiscovery() { - //LOG_DEBUG(ctx, "Cluster", "Sending discovery packet..."); - ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); - ctx.udp->write(ClusterProtocol::DISCOVERY_MSG); - ctx.udp->endPacket(); -} +// Discovery functionality removed - using heartbeat-only approach void ClusterManager::listen() { int packetSize = ctx.udp->parsePacket(); @@ -69,10 +69,8 @@ void ClusterManager::listen() { void ClusterManager::initMessageHandlers() { messageHandlers.clear(); messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" }); - messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" }); messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" }); - messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" }); - messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" }); + messageHandlers.push_back({ &ClusterManager::isNodeUpdateMsg, [this](const char* msg){ this->onNodeUpdate(msg); }, "NODE_UPDATE" }); messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" }); } @@ -94,20 +92,12 @@ void ClusterManager::handleIncomingMessage(const char* incoming) { LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head); } -bool ClusterManager::isDiscoveryMsg(const char* msg) { - return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0; -} - bool ClusterManager::isHeartbeatMsg(const char* msg) { return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0; } -bool ClusterManager::isResponseMsg(const char* msg) { - return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0; -} - -bool ClusterManager::isNodeInfoMsg(const char* msg) { - return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0; +bool ClusterManager::isNodeUpdateMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::NODE_UPDATE_MSG, strlen(ClusterProtocol::NODE_UPDATE_MSG)) == 0; } bool ClusterManager::isClusterEventMsg(const char* msg) { @@ -123,87 +113,122 @@ bool ClusterManager::isRawMsg(const char* msg) { return msg[prefixLen] == ':'; } -void ClusterManager::onDiscovery(const char* /*msg*/) { - ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); - String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; - ctx.udp->write(response.c_str()); - ctx.udp->endPacket(); +// Discovery functionality removed - using heartbeat-only approach + +void ClusterManager::onHeartbeat(const char* msg) { + // Extract hostname from heartbeat message: "CLUSTER_HEARTBEAT:hostname" + const char* colon = strchr(msg, ':'); + if (!colon) { + LOG_WARN("Cluster", "Invalid heartbeat message format"); + return; + } + + String hostname = String(colon + 1); + IPAddress senderIP = ctx.udp->remoteIP(); + + // Update memberlist with the heartbeat + addOrUpdateNode(hostname, senderIP); + + // Respond with minimal node info (hostname, ip, uptime, labels) + sendNodeInfo(hostname, senderIP); } -void ClusterManager::onHeartbeat(const char* /*msg*/) { +void ClusterManager::onNodeUpdate(const char* msg) { + // Message format: "NODE_UPDATE:hostname:{json}" + const char* firstColon = strchr(msg, ':'); + if (!firstColon) { + LOG_WARN("Cluster", "Invalid NODE_UPDATE message format"); + return; + } + + const char* secondColon = strchr(firstColon + 1, ':'); + if (!secondColon) { + LOG_WARN("Cluster", "Invalid NODE_UPDATE message format"); + return; + } + + String hostnamePart = String(firstColon + 1); + String hostname = hostnamePart.substring(0, secondColon - firstColon - 1); + const char* jsonCStr = secondColon + 1; + + JsonDocument doc; + DeserializationError err = deserializeJson(doc, jsonCStr); + if (err) { + LOG_WARN("Cluster", String("Failed to parse NODE_UPDATE JSON from ") + ctx.udp->remoteIP().toString()); + return; + } + + // Update the specific node in memberlist + auto& memberList = *ctx.memberList; + auto it = memberList.find(hostname); + if (it != memberList.end()) { + NodeInfo& node = it->second; + + // Update basic info if provided + if (doc["hostname"].is()) { + node.hostname = doc["hostname"].as(); + } + if (doc["uptime"].is()) { + node.uptime = doc["uptime"]; + } + + // Update labels if provided + if (doc["labels"].is()) { + node.labels.clear(); + JsonObject labelsObj = doc["labels"].as(); + for (JsonPair kvp : labelsObj) { + const char* key = kvp.key().c_str(); + const char* value = labelsObj[kvp.key()]; + node.labels[key] = String(value); + } + } + + node.lastSeen = millis(); + node.status = NodeInfo::ACTIVE; + + LOG_DEBUG("Cluster", String("Updated node ") + hostname + " from NODE_UPDATE"); + } else { + LOG_WARN("Cluster", String("Received NODE_UPDATE for unknown node: ") + hostname); + } +} + +void ClusterManager::sendNodeInfo(const String& hostname, const IPAddress& targetIP) { JsonDocument doc; - if (ctx.memberList) { - auto it = ctx.memberList->find(ctx.hostname); - if (it != ctx.memberList->end()) { + // Get our node info for the response + auto& memberList = *ctx.memberList; + auto it = memberList.find(ctx.hostname); + if (it != memberList.end()) { + const NodeInfo& node = it->second; + + // Minimal response: hostname, ip, uptime, labels + doc["hostname"] = node.hostname; + doc["ip"] = node.ip.toString(); + doc["uptime"] = millis() - node.lastSeen; // Approximate uptime + + // Add labels if present + if (!node.labels.empty()) { JsonObject labelsObj = doc["labels"].to(); - for (const auto& kv : it->second.labels) { - labelsObj[kv.first.c_str()] = kv.second; - } - } else if (!ctx.self.labels.empty()) { - JsonObject labelsObj = doc["labels"].to(); - for (const auto& kv : ctx.self.labels) { + for (const auto& kv : node.labels) { labelsObj[kv.first.c_str()] = kv.second; } } + } else { + // Fallback to basic info + doc["hostname"] = ctx.hostname; + doc["ip"] = ctx.localIP.toString(); + doc["uptime"] = millis(); } String json; serializeJson(doc, json); - ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); - String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json; + ctx.udp->beginPacket(targetIP, ctx.config.udp_port); + String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + hostname + ":" + json; ctx.udp->write(msg.c_str()); ctx.udp->endPacket(); -} -void ClusterManager::onResponse(const char* msg) { - char* hostPtr = const_cast(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1; - String nodeHost = String(hostPtr); - addOrUpdateNode(nodeHost, ctx.udp->remoteIP()); -} - -void ClusterManager::onNodeInfo(const char* msg) { - char* p = const_cast(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1; - char* hostEnd = strchr(p, ':'); - if (hostEnd) { - *hostEnd = '\0'; - const char* hostCStr = p; - const char* jsonCStr = hostEnd + 1; - - String nodeHost = String(hostCStr); - IPAddress senderIP = ctx.udp->remoteIP(); - - addOrUpdateNode(nodeHost, senderIP); - - JsonDocument doc; - DeserializationError err = deserializeJson(doc, jsonCStr); - if (!err) { - auto& memberList = *ctx.memberList; - auto it = memberList.find(nodeHost); - if (it != memberList.end()) { - NodeInfo& node = it->second; - node.status = NodeInfo::ACTIVE; - unsigned long now = millis(); - node.lastSeen = now; - if (lastHeartbeatSentAt != 0) { - node.latency = now - lastHeartbeatSentAt; - } - - node.labels.clear(); - if (doc["labels"].is()) { - JsonObject labelsObj = doc["labels"].as(); - for (JsonPair kvp : labelsObj) { - const char* key = kvp.key().c_str(); - const char* value = labelsObj[kvp.key()]; - node.labels[key] = value; - } - } - } - } else { - LOG_WARN("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString()); - } - } + LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + hostname + " @ " + targetIP.toString()); } void ClusterManager::onClusterEvent(const char* msg) { @@ -406,16 +431,19 @@ void ClusterManager::heartbeatTaskCallback() { NodeInfo& node = it->second; node.lastSeen = millis(); node.status = NodeInfo::ACTIVE; + node.uptime = millis(); // Update uptime updateLocalNodeResources(); addOrUpdateNode(ctx.hostname, ctx.localIP); } - // Broadcast heartbeat so peers can respond with their node info + // Broadcast heartbeat - peers will respond with NODE_UPDATE lastHeartbeatSentAt = millis(); ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname; ctx.udp->write(hb.c_str()); ctx.udp->endPacket(); + + LOG_DEBUG("Cluster", String("Sent heartbeat: ") + ctx.hostname); } void ClusterManager::updateAllMembersInfoTaskCallback() { @@ -423,6 +451,40 @@ void ClusterManager::updateAllMembersInfoTaskCallback() { // No-op to reduce network and memory usage } +void ClusterManager::broadcastNodeUpdate() { + // Broadcast our current node info as NODE_UPDATE to all cluster members + auto& memberList = *ctx.memberList; + auto it = memberList.find(ctx.hostname); + if (it == memberList.end()) { + return; + } + + const NodeInfo& node = it->second; + + JsonDocument doc; + doc["hostname"] = node.hostname; + doc["uptime"] = node.uptime; + + // Add labels if present + if (!node.labels.empty()) { + JsonObject labelsObj = doc["labels"].to(); + for (const auto& kv : node.labels) { + labelsObj[kv.first.c_str()] = kv.second; + } + } + + String json; + serializeJson(doc, json); + + // Broadcast to all cluster members + ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); + String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + ctx.hostname + ":" + json; + ctx.udp->write(msg.c_str()); + ctx.udp->endPacket(); + + LOG_DEBUG("Cluster", String("Broadcasted NODE_UPDATE for ") + ctx.hostname); +} + void ClusterManager::updateAllNodeStatuses() { auto& memberList = *ctx.memberList; unsigned long now = millis(); diff --git a/src/spore/services/NodeService.cpp b/src/spore/services/NodeService.cpp index 003ec07..451cb72 100644 --- a/src/spore/services/NodeService.cpp +++ b/src/spore/services/NodeService.cpp @@ -255,11 +255,10 @@ void NodeService::handleGetConfigRequest(AsyncWebServerRequest* request) { // Cluster Configuration JsonObject clusterObj = doc["cluster"].to(); - clusterObj["discovery_interval_ms"] = ctx.config.discovery_interval_ms; clusterObj["heartbeat_interval_ms"] = ctx.config.heartbeat_interval_ms; clusterObj["cluster_listen_interval_ms"] = ctx.config.cluster_listen_interval_ms; clusterObj["status_update_interval_ms"] = ctx.config.status_update_interval_ms; - clusterObj["member_info_update_interval_ms"] = ctx.config.member_info_update_interval_ms; + clusterObj["node_update_broadcast_interval_ms"] = ctx.config.node_update_broadcast_interval_ms; clusterObj["print_interval_ms"] = ctx.config.print_interval_ms; // Node Status Thresholds diff --git a/src/spore/types/Config.cpp b/src/spore/types/Config.cpp index 681cc73..c115e24 100644 --- a/src/spore/types/Config.cpp +++ b/src/spore/types/Config.cpp @@ -32,11 +32,10 @@ void Config::setDefaults() { api_server_port = DEFAULT_API_SERVER_PORT; // Cluster Configuration - discovery_interval_ms = DEFAULT_DISCOVERY_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS; status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS; - member_info_update_interval_ms = DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms + node_update_broadcast_interval_ms = DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS; print_interval_ms = DEFAULT_PRINT_INTERVAL_MS; // Node Status Thresholds @@ -86,11 +85,10 @@ bool Config::saveToFile(const String& filename) { doc["network"]["api_server_port"] = api_server_port; // Cluster Configuration - doc["cluster"]["discovery_interval_ms"] = discovery_interval_ms; doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms; doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms; doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms; - doc["cluster"]["member_info_update_interval_ms"] = member_info_update_interval_ms; + doc["cluster"]["node_update_broadcast_interval_ms"] = node_update_broadcast_interval_ms; doc["cluster"]["print_interval_ms"] = print_interval_ms; // Node Status Thresholds @@ -166,11 +164,10 @@ bool Config::loadFromFile(const String& filename) { api_server_port = doc["network"]["api_server_port"] | DEFAULT_API_SERVER_PORT; // Load Cluster Configuration with defaults - discovery_interval_ms = doc["cluster"]["discovery_interval_ms"] | DEFAULT_DISCOVERY_INTERVAL_MS; heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS; cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS; - member_info_update_interval_ms = doc["cluster"]["member_info_update_interval_ms"] | DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS; + node_update_broadcast_interval_ms = doc["cluster"]["node_update_broadcast_interval_ms"] | DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS; print_interval_ms = doc["cluster"]["print_interval_ms"] | DEFAULT_PRINT_INTERVAL_MS; // Load Node Status Thresholds with defaults -- 2.49.1 From b6ad4793528972f6b4ec0e5ebea1ff8aa6c09916 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Sun, 19 Oct 2025 12:59:26 +0200 Subject: [PATCH 2/5] feat: calculate latency during heartbeat --- src/spore/core/ClusterManager.cpp | 58 +++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index a0a53d7..5b084ed 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -158,53 +158,72 @@ void ClusterManager::onNodeUpdate(const char* msg) { return; } - // Update the specific node in memberlist + // The NODE_UPDATE contains info about the target node (hostname from message) + // but is sent FROM the responding node (ctx.udp->remoteIP()) + // We need to find the responding node in the memberlist, not the target node + IPAddress respondingNodeIP = ctx.udp->remoteIP(); auto& memberList = *ctx.memberList; - auto it = memberList.find(hostname); - if (it != memberList.end()) { - NodeInfo& node = it->second; - // Update basic info if provided + // Find the responding node by IP address + NodeInfo* respondingNode = nullptr; + for (auto& pair : memberList) { + if (pair.second.ip == respondingNodeIP) { + respondingNode = &pair.second; + break; + } + } + + if (respondingNode) { + // Calculate latency if we recently sent a heartbeat + unsigned long latency = 0; + if (lastHeartbeatSentAt != 0) { + unsigned long now = millis(); + latency = now - lastHeartbeatSentAt; + lastHeartbeatSentAt = 0; // Reset for next calculation + } + + // Update the responding node's information if (doc["hostname"].is()) { - node.hostname = doc["hostname"].as(); + respondingNode->hostname = doc["hostname"].as(); } if (doc["uptime"].is()) { - node.uptime = doc["uptime"]; + respondingNode->uptime = doc["uptime"]; } // Update labels if provided if (doc["labels"].is()) { - node.labels.clear(); + respondingNode->labels.clear(); JsonObject labelsObj = doc["labels"].as(); for (JsonPair kvp : labelsObj) { const char* key = kvp.key().c_str(); const char* value = labelsObj[kvp.key()]; - node.labels[key] = String(value); + respondingNode->labels[key] = String(value); } } - node.lastSeen = millis(); - node.status = NodeInfo::ACTIVE; + respondingNode->lastSeen = millis(); + respondingNode->status = NodeInfo::ACTIVE; + respondingNode->latency = latency; - LOG_DEBUG("Cluster", String("Updated node ") + hostname + " from NODE_UPDATE"); + LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + " | latency: " + String(latency) + "ms"); } else { - LOG_WARN("Cluster", String("Received NODE_UPDATE for unknown node: ") + hostname); + LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString()); } } -void ClusterManager::sendNodeInfo(const String& hostname, const IPAddress& targetIP) { +void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) { JsonDocument doc; - // Get our node info for the response + // Get our node info for the response (we're the responding node) auto& memberList = *ctx.memberList; auto it = memberList.find(ctx.hostname); if (it != memberList.end()) { const NodeInfo& node = it->second; - // Minimal response: hostname, ip, uptime, labels + // Response contains info about ourselves (the responding node) doc["hostname"] = node.hostname; doc["ip"] = node.ip.toString(); - doc["uptime"] = millis() - node.lastSeen; // Approximate uptime + doc["uptime"] = node.uptime; // Add labels if present if (!node.labels.empty()) { @@ -223,12 +242,13 @@ void ClusterManager::sendNodeInfo(const String& hostname, const IPAddress& targe String json; serializeJson(doc, json); + // Send NODE_UPDATE:targetHostname:{json about responding node} ctx.udp->beginPacket(targetIP, ctx.config.udp_port); - String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + hostname + ":" + json; + String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + targetHostname + ":" + json; ctx.udp->write(msg.c_str()); ctx.udp->endPacket(); - LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + hostname + " @ " + targetIP.toString()); + LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + targetHostname + " @ " + targetIP.toString()); } void ClusterManager::onClusterEvent(const char* msg) { -- 2.49.1 From 23289d9f09704ea861099a270dc77cef04953953 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Sun, 19 Oct 2025 13:11:36 +0200 Subject: [PATCH 3/5] fix: latency calculation --- src/spore/core/ClusterManager.cpp | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 5b084ed..8a3c996 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -174,10 +174,10 @@ void ClusterManager::onNodeUpdate(const char* msg) { } if (respondingNode) { - // Calculate latency if we recently sent a heartbeat + // Calculate latency only if we recently sent a heartbeat (within last 1 second) unsigned long latency = 0; - if (lastHeartbeatSentAt != 0) { - unsigned long now = millis(); + unsigned long now = millis(); + if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window latency = now - lastHeartbeatSentAt; lastHeartbeatSentAt = 0; // Reset for next calculation } @@ -201,11 +201,15 @@ void ClusterManager::onNodeUpdate(const char* msg) { } } - respondingNode->lastSeen = millis(); + respondingNode->lastSeen = now; respondingNode->status = NodeInfo::ACTIVE; - respondingNode->latency = latency; - LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + " | latency: " + String(latency) + "ms"); + // Update latency if we calculated it (preserve existing value if not) + if (latency > 0) { + respondingNode->latency = latency; + } + + LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + " | latency: " + String(latency > 0 ? String(latency) + "ms" : "not calculated")); } else { LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString()); } @@ -314,17 +318,18 @@ void ClusterManager::onRawMessage(const char* msg) { void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { auto& memberList = *ctx.memberList; - + // O(1) lookup instead of O(n) search auto it = memberList.find(nodeHost); if (it != memberList.end()) { - // Update existing node + // Update existing node - preserve all existing field values it->second.ip = nodeIP; it->second.lastSeen = millis(); + // Note: Other fields like latency, uptime, labels, etc. are preserved //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task return; } - + // Add new node NodeInfo newNode; newNode.hostname = nodeHost; -- 2.49.1 From 407b651b82fbe55f6d6d98b3cc9a431b700efd72 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Sun, 19 Oct 2025 13:48:13 +0200 Subject: [PATCH 4/5] feat: change event naming schema --- ctl.sh | 2 - docs/Architecture.md | 17 ++---- docs/ConfigurationManagement.md | 4 +- include/spore/types/Config.h | 2 - src/spore/core/ClusterManager.cpp | 88 ++++++++++++++++++++++-------- src/spore/core/NetworkManager.cpp | 2 +- src/spore/services/NodeService.cpp | 2 - src/spore/types/Config.cpp | 6 -- 8 files changed, 74 insertions(+), 49 deletions(-) diff --git a/ctl.sh b/ctl.sh index 99e2f50..a3e2d3f 100755 --- a/ctl.sh +++ b/ctl.sh @@ -375,8 +375,6 @@ function node { echo "Heartbeat Interval: $(echo "$response_body" | jq -r '.cluster.heartbeat_interval_ms // "N/A"') ms" echo "Cluster Listen Interval: $(echo "$response_body" | jq -r '.cluster.cluster_listen_interval_ms // "N/A"') ms" echo "Status Update Interval: $(echo "$response_body" | jq -r '.cluster.status_update_interval_ms // "N/A"') ms" - echo "Node Update Broadcast Interval: $(echo "$response_body" | jq -r '.cluster.node_update_broadcast_interval_ms // "N/A"') ms" - echo "Print Interval: $(echo "$response_body" | jq -r '.cluster.print_interval_ms // "N/A"') ms" echo "" # Node Status Thresholds diff --git a/docs/Architecture.md b/docs/Architecture.md index 095e084..427176e 100644 --- a/docs/Architecture.md +++ b/docs/Architecture.md @@ -57,7 +57,6 @@ The cluster uses a UDP-based discovery protocol for automatic node detection: - **Broadcast Address**: 255.255.255.255 - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) -- **Node Update Broadcast Interval**: `Config.node_update_broadcast_interval_ms` (default 5000 ms) ### Message Formats @@ -95,7 +94,6 @@ The `cluster_listen` task parses one UDP packet per run and dispatches by prefix - **UDP Port**: `Config.udp_port` (default 4210) - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) -- **Node Update Broadcast Interval**: `Config.node_update_broadcast_interval_ms` (default 5000 ms) ### Node Status Categories @@ -113,12 +111,9 @@ The system runs several background tasks at different intervals: | Task | Interval (default) | Purpose | |------|--------------------|---------| -| `cluster_discovery` | 1000 ms | Send UDP discovery packets | -| `cluster_listen` | 10 ms | Listen for discovery/heartbeat/node-info | +| `cluster_listen` | 10 ms | Listen for heartbeat/node-info messages | | `status_update` | 1000 ms | Update node status categories, purge dead | | `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources | -| `cluster_update_members_info` | 10000 ms | Reserved; no-op (info via UDP) | -| `print_members` | 5000 ms | Log current member list | ### Task Management Features @@ -135,12 +130,12 @@ The `NodeContext` provides an event-driven architecture for system-wide communic ```cpp // Subscribe to events -ctx.on("node_discovered", [](void* data) { +ctx.on("node/discovered", [](void* data) { NodeInfo* node = static_cast(data); // Handle new node discovery }); -ctx.on("cluster_updated", [](void* data) { +ctx.on("cluster/updated", [](void* data) { // Handle cluster membership changes }); ``` @@ -149,13 +144,13 @@ ctx.on("cluster_updated", [](void* data) { ```cpp // Publish events -ctx.fire("node_discovered", &newNode); -ctx.fire("cluster_updated", &clusterData); +ctx.fire("node/discovered", &newNode); +ctx.fire("cluster/updated", &clusterData); ``` ### Available Events -- **`node_discovered`**: New node added or local node refreshed +- **`node/discovered`**: New node added or local node refreshed ## Resource Monitoring diff --git a/docs/ConfigurationManagement.md b/docs/ConfigurationManagement.md index 7561121..f13972c 100644 --- a/docs/ConfigurationManagement.md +++ b/docs/ConfigurationManagement.md @@ -95,9 +95,7 @@ The configuration is stored as a JSON file with the following structure: "cluster": { "heartbeat_interval_ms": 5000, "cluster_listen_interval_ms": 10, - "status_update_interval_ms": 1000, - "node_update_broadcast_interval_ms": 5000, - "print_interval_ms": 5000 + "status_update_interval_ms": 1000 }, "thresholds": { "node_active_threshold_ms": 10000, diff --git a/include/spore/types/Config.h b/include/spore/types/Config.h index d24cc2e..5592234 100644 --- a/include/spore/types/Config.h +++ b/include/spore/types/Config.h @@ -40,8 +40,6 @@ public: unsigned long heartbeat_interval_ms; unsigned long cluster_listen_interval_ms; unsigned long status_update_interval_ms; - unsigned long node_update_broadcast_interval_ms; - unsigned long print_interval_ms; // Node Status Thresholds unsigned long node_active_threshold_ms; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 8a3c996..0bbf5f2 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -3,8 +3,8 @@ #include "spore/util/Logging.h" ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) { - // Register callback for node_discovered event - ctx.on("node_discovered", [this](void* data) { + // Register callback for node/discovered event + ctx.on("node/discovered", [this](void* data) { NodeInfo* node = static_cast(data); this->addOrUpdateNode(node->hostname, node->ip); }); @@ -26,11 +26,16 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx this->ctx.udp->endPacket(); }); - // Handler for node update broadcasts: services fire 'cluster/node_update' when their node info changes - ctx.on("cluster/node_update", [this](void* data) { + // Handler for node update broadcasts: services fire 'cluster/node/update' when their node info changes + ctx.on("cluster/node/update", [this](void* data) { // Trigger immediate NODE_UPDATE broadcast when node info changes broadcastNodeUpdate(); }); + + // Handler for memberlist changes: print memberlist when it changes + ctx.on("cluster/memberlist/changed", [this](void* data) { + printMemberList(); + }); // Register tasks registerTasks(); initMessageHandlers(); @@ -39,9 +44,7 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx void ClusterManager::registerTasks() { taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); - taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); - taskManager.registerTask("node_update_broadcast", ctx.config.node_update_broadcast_interval_ms, [this]() { broadcastNodeUpdate(); }); LOG_INFO("ClusterManager", "Registered all cluster tasks"); } @@ -183,21 +186,35 @@ void ClusterManager::onNodeUpdate(const char* msg) { } // Update the responding node's information + bool hostnameChanged = false; if (doc["hostname"].is()) { - respondingNode->hostname = doc["hostname"].as(); + String newHostname = doc["hostname"].as(); + if (respondingNode->hostname != newHostname) { + respondingNode->hostname = newHostname; + hostnameChanged = true; + } } + if (doc["uptime"].is()) { respondingNode->uptime = doc["uptime"]; } // Update labels if provided + bool labelsChanged = false; if (doc["labels"].is()) { - respondingNode->labels.clear(); + // Check if labels actually changed JsonObject labelsObj = doc["labels"].as(); + std::map newLabels; for (JsonPair kvp : labelsObj) { const char* key = kvp.key().c_str(); const char* value = labelsObj[kvp.key()]; - respondingNode->labels[key] = String(value); + newLabels[key] = String(value); + } + + // Compare with existing labels + if (newLabels != respondingNode->labels) { + labelsChanged = true; + respondingNode->labels = newLabels; } } @@ -209,7 +226,18 @@ void ClusterManager::onNodeUpdate(const char* msg) { respondingNode->latency = latency; } - LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + " | latency: " + String(latency > 0 ? String(latency) + "ms" : "not calculated")); + // Check if any fields changed that require broadcasting + bool nodeInfoChanged = hostnameChanged || labelsChanged; + + if (nodeInfoChanged) { + // Fire cluster/node/update event to trigger broadcast + ctx.fire("cluster/node/update", nullptr); + } + + LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + + " | hostname: " + (hostnameChanged ? "changed" : "unchanged") + + " | labels: " + (labelsChanged ? "changed" : "unchanged") + + " | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated")); } else { LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString()); } @@ -318,27 +346,36 @@ void ClusterManager::onRawMessage(const char* msg) { void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { auto& memberList = *ctx.memberList; + bool memberlistChanged = false; // O(1) lookup instead of O(n) search auto it = memberList.find(nodeHost); if (it != memberList.end()) { // Update existing node - preserve all existing field values - it->second.ip = nodeIP; + if (it->second.ip != nodeIP) { + it->second.ip = nodeIP; + memberlistChanged = true; + } it->second.lastSeen = millis(); // Note: Other fields like latency, uptime, labels, etc. are preserved //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task - return; + } else { + // Add new node + NodeInfo newNode; + newNode.hostname = nodeHost; + newNode.ip = nodeIP; + newNode.lastSeen = millis(); + updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); + memberList[nodeHost] = newNode; + memberlistChanged = true; + LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); + //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task } - // Add new node - NodeInfo newNode; - newNode.hostname = nodeHost; - newNode.ip = nodeIP; - newNode.lastSeen = millis(); - updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); - memberList[nodeHost] = newNode; - LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); - //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task + // Fire event if memberlist changed + if (memberlistChanged) { + ctx.fire("cluster/memberlist/changed", nullptr); + } } // unused http client to fetch complete node info @@ -522,17 +559,24 @@ void ClusterManager::updateAllNodeStatuses() { void ClusterManager::removeDeadNodes() { auto& memberList = *ctx.memberList; unsigned long now = millis(); - + bool memberlistChanged = false; + // Use iterator to safely remove elements from map for (auto it = memberList.begin(); it != memberList.end(); ) { unsigned long diff = now - it->second.lastSeen; if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) { LOG_INFO("Cluster", "Removing node: " + it->second.hostname); it = memberList.erase(it); + memberlistChanged = true; } else { ++it; } } + + // Fire event if memberlist changed + if (memberlistChanged) { + ctx.fire("cluster/memberlist/changed", nullptr); + } } void ClusterManager::printMemberList() { diff --git a/src/spore/core/NetworkManager.cpp b/src/spore/core/NetworkManager.cpp index c70b223..7a0729c 100644 --- a/src/spore/core/NetworkManager.cpp +++ b/src/spore/core/NetworkManager.cpp @@ -128,5 +128,5 @@ void NetworkManager::setupWiFi() { } // Notify listeners that the node is (re)discovered - ctx.fire("node_discovered", &ctx.self); + ctx.fire("node/discovered", &ctx.self); } diff --git a/src/spore/services/NodeService.cpp b/src/spore/services/NodeService.cpp index 451cb72..473fcdb 100644 --- a/src/spore/services/NodeService.cpp +++ b/src/spore/services/NodeService.cpp @@ -258,8 +258,6 @@ void NodeService::handleGetConfigRequest(AsyncWebServerRequest* request) { clusterObj["heartbeat_interval_ms"] = ctx.config.heartbeat_interval_ms; clusterObj["cluster_listen_interval_ms"] = ctx.config.cluster_listen_interval_ms; clusterObj["status_update_interval_ms"] = ctx.config.status_update_interval_ms; - clusterObj["node_update_broadcast_interval_ms"] = ctx.config.node_update_broadcast_interval_ms; - clusterObj["print_interval_ms"] = ctx.config.print_interval_ms; // Node Status Thresholds JsonObject thresholdsObj = doc["thresholds"].to(); diff --git a/src/spore/types/Config.cpp b/src/spore/types/Config.cpp index c115e24..4213e76 100644 --- a/src/spore/types/Config.cpp +++ b/src/spore/types/Config.cpp @@ -35,8 +35,6 @@ void Config::setDefaults() { cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS; status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS; - node_update_broadcast_interval_ms = DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS; - print_interval_ms = DEFAULT_PRINT_INTERVAL_MS; // Node Status Thresholds node_active_threshold_ms = DEFAULT_NODE_ACTIVE_THRESHOLD_MS; @@ -88,8 +86,6 @@ bool Config::saveToFile(const String& filename) { doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms; doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms; doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms; - doc["cluster"]["node_update_broadcast_interval_ms"] = node_update_broadcast_interval_ms; - doc["cluster"]["print_interval_ms"] = print_interval_ms; // Node Status Thresholds doc["thresholds"]["node_active_threshold_ms"] = node_active_threshold_ms; @@ -167,8 +163,6 @@ bool Config::loadFromFile(const String& filename) { heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS; cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS; - node_update_broadcast_interval_ms = doc["cluster"]["node_update_broadcast_interval_ms"] | DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS; - print_interval_ms = doc["cluster"]["print_interval_ms"] | DEFAULT_PRINT_INTERVAL_MS; // Load Node Status Thresholds with defaults node_active_threshold_ms = doc["thresholds"]["node_active_threshold_ms"] | DEFAULT_NODE_ACTIVE_THRESHOLD_MS; -- 2.49.1 From 0d09c5900c10bbc39fe99a646311e415ecce6eb9 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Sun, 19 Oct 2025 14:04:37 +0200 Subject: [PATCH 5/5] docs: update --- docs/Architecture.md | 59 ++++++++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/docs/Architecture.md b/docs/Architecture.md index 427176e..57f61d8 100644 --- a/docs/Architecture.md +++ b/docs/Architecture.md @@ -42,16 +42,14 @@ The cluster uses a UDP-based discovery protocol for automatic node detection: ### Discovery Process -1. **Discovery Broadcast**: Nodes periodically send UDP packets on port `udp_port` (default 4210) -2. **Response Handling**: Nodes respond with `CLUSTER_RESPONSE:` -3. **Member Management**: Discovered nodes are added/updated in the cluster -4. **Node Info via UDP**: Heartbeat triggers peers to send `CLUSTER_NODE_INFO::` +1. **Discovery Broadcast**: Nodes periodically send heartbeat messages on port `udp_port` (default 4210) +2. **Response Handling**: Nodes respond with node update information containing their current state +3. **Member Management**: Discovered nodes are added/updated in the cluster with current information +4. **Node Synchronization**: Periodic broadcasts ensure all nodes maintain current cluster state ### Protocol Details - **UDP Port**: 4210 (configurable via `Config.udp_port`) -- **Discovery Message**: `CLUSTER_DISCOVERY` -- **Response Message**: `CLUSTER_RESPONSE` - **Heartbeat Message**: `CLUSTER_HEARTBEAT:hostname` - **Node Update Message**: `NODE_UPDATE:hostname:{json}` - **Broadcast Address**: 255.255.255.255 @@ -64,30 +62,55 @@ The cluster uses a UDP-based discovery protocol for automatic node detection: - Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval - Purpose: announce presence, prompt peers for node info, and keep liveness - **Node Update**: `NODE_UPDATE:hostname:{json}` - - Sender: node receiving a heartbeat; unicast to heartbeat sender IP + - Sender: node responding to heartbeat or broadcasting current state - JSON fields: hostname, ip, uptime, optional labels - - Purpose: provide minimal node information in response to heartbeat + - Purpose: provide current node information for cluster synchronization -### Heartbeat Flow +### Discovery Flow -1. **A node broadcasts** `CLUSTER_HEARTBEAT:hostname` -2. **Each receiver responds** with `NODE_UPDATE:hostname:{json}` to the heartbeat sender IP +1. **A node broadcasts** `CLUSTER_HEARTBEAT:hostname` to announce its presence +2. **Each receiver responds** with `NODE_UPDATE:hostname:{json}` containing current node state 3. **The sender**: - - Ensures the node exists or creates it with `hostname` and sender IP + - Ensures the responding node exists or creates it with current IP and information - Parses JSON and updates node info, `status = ACTIVE`, `lastSeen = now` - - Sets `latency = now - lastHeartbeatSentAt` (per-node, measured at heartbeat origin) + - Calculates `latency = now - lastHeartbeatSentAt` for network performance monitoring -### Node Update Broadcasting +### Node Synchronization -1. **Periodic broadcast**: Each node broadcasts `NODE_UPDATE:hostname:{json}` every 5 seconds +1. **Event-driven broadcasts**: Nodes broadcast `NODE_UPDATE:hostname:{json}` when node information changes 2. **All receivers**: Update their memberlist entry for the broadcasting node -3. **Purpose**: Ensures all nodes have current information about each other +3. **Purpose**: Ensures all nodes maintain current cluster state and configuration + +### Sequence Diagram + +```mermaid +sequenceDiagram + participant N1 as Node A (esp-node1) + participant N2 as Node B (esp-node2) + + Note over N1,N2: Discovery via heartbeat broadcast + N1->>+N2: CLUSTER_HEARTBEAT:esp-node1 + + Note over N2: Node B responds with its current state + N2->>+N1: NODE_UPDATE:esp-node1:{"hostname":"esp-node2","uptime":12345,"labels":{"role":"sensor"}} + + Note over N1: Process NODE_UPDATE response + N1-->>N1: Update memberlist for Node B + N1-->>N1: Set Node B status = ACTIVE + N1-->>N1: Calculate latency for Node B + + Note over N1,N2: Event-driven node synchronization + N1->>+N2: NODE_UPDATE:esp-node1:{"hostname":"esp-node1","uptime":12346,"labels":{"role":"controller"}} + + Note over N2: Update memberlist with latest information + N2-->>N2: Update Node A info, maintain ACTIVE status +``` ### Listener Behavior The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to: -- **Heartbeat** → add/update node and send `NODE_UPDATE` JSON response -- **Node Update** → update node information and status +- **Heartbeat** → add/update responding node and send `NODE_UPDATE` response +- **Node Update** → update node information and trigger memberlist logging ### Timing and Intervals -- 2.49.1