From 407b651b82fbe55f6d6d98b3cc9a431b700efd72 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Sun, 19 Oct 2025 13:48:13 +0200 Subject: [PATCH] feat: change event naming schema --- ctl.sh | 2 - docs/Architecture.md | 17 ++---- docs/ConfigurationManagement.md | 4 +- include/spore/types/Config.h | 2 - src/spore/core/ClusterManager.cpp | 88 ++++++++++++++++++++++-------- src/spore/core/NetworkManager.cpp | 2 +- src/spore/services/NodeService.cpp | 2 - src/spore/types/Config.cpp | 6 -- 8 files changed, 74 insertions(+), 49 deletions(-) diff --git a/ctl.sh b/ctl.sh index 99e2f50..a3e2d3f 100755 --- a/ctl.sh +++ b/ctl.sh @@ -375,8 +375,6 @@ function node { echo "Heartbeat Interval: $(echo "$response_body" | jq -r '.cluster.heartbeat_interval_ms // "N/A"') ms" echo "Cluster Listen Interval: $(echo "$response_body" | jq -r '.cluster.cluster_listen_interval_ms // "N/A"') ms" echo "Status Update Interval: $(echo "$response_body" | jq -r '.cluster.status_update_interval_ms // "N/A"') ms" - echo "Node Update Broadcast Interval: $(echo "$response_body" | jq -r '.cluster.node_update_broadcast_interval_ms // "N/A"') ms" - echo "Print Interval: $(echo "$response_body" | jq -r '.cluster.print_interval_ms // "N/A"') ms" echo "" # Node Status Thresholds diff --git a/docs/Architecture.md b/docs/Architecture.md index 095e084..427176e 100644 --- a/docs/Architecture.md +++ b/docs/Architecture.md @@ -57,7 +57,6 @@ The cluster uses a UDP-based discovery protocol for automatic node detection: - **Broadcast Address**: 255.255.255.255 - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) -- **Node Update Broadcast Interval**: `Config.node_update_broadcast_interval_ms` (default 5000 ms) ### Message Formats @@ -95,7 +94,6 @@ The `cluster_listen` task parses one UDP packet per run and dispatches by prefix - **UDP Port**: `Config.udp_port` (default 4210) - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) -- **Node Update Broadcast Interval**: `Config.node_update_broadcast_interval_ms` (default 5000 ms) ### Node Status Categories @@ -113,12 +111,9 @@ The system runs several background tasks at different intervals: | Task | Interval (default) | Purpose | |------|--------------------|---------| -| `cluster_discovery` | 1000 ms | Send UDP discovery packets | -| `cluster_listen` | 10 ms | Listen for discovery/heartbeat/node-info | +| `cluster_listen` | 10 ms | Listen for heartbeat/node-info messages | | `status_update` | 1000 ms | Update node status categories, purge dead | | `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources | -| `cluster_update_members_info` | 10000 ms | Reserved; no-op (info via UDP) | -| `print_members` | 5000 ms | Log current member list | ### Task Management Features @@ -135,12 +130,12 @@ The `NodeContext` provides an event-driven architecture for system-wide communic ```cpp // Subscribe to events -ctx.on("node_discovered", [](void* data) { +ctx.on("node/discovered", [](void* data) { NodeInfo* node = static_cast(data); // Handle new node discovery }); -ctx.on("cluster_updated", [](void* data) { +ctx.on("cluster/updated", [](void* data) { // Handle cluster membership changes }); ``` @@ -149,13 +144,13 @@ ctx.on("cluster_updated", [](void* data) { ```cpp // Publish events -ctx.fire("node_discovered", &newNode); -ctx.fire("cluster_updated", &clusterData); +ctx.fire("node/discovered", &newNode); +ctx.fire("cluster/updated", &clusterData); ``` ### Available Events -- **`node_discovered`**: New node added or local node refreshed +- **`node/discovered`**: New node added or local node refreshed ## Resource Monitoring diff --git a/docs/ConfigurationManagement.md b/docs/ConfigurationManagement.md index 7561121..f13972c 100644 --- a/docs/ConfigurationManagement.md +++ b/docs/ConfigurationManagement.md @@ -95,9 +95,7 @@ The configuration is stored as a JSON file with the following structure: "cluster": { "heartbeat_interval_ms": 5000, "cluster_listen_interval_ms": 10, - "status_update_interval_ms": 1000, - "node_update_broadcast_interval_ms": 5000, - "print_interval_ms": 5000 + "status_update_interval_ms": 1000 }, "thresholds": { "node_active_threshold_ms": 10000, diff --git a/include/spore/types/Config.h b/include/spore/types/Config.h index d24cc2e..5592234 100644 --- a/include/spore/types/Config.h +++ b/include/spore/types/Config.h @@ -40,8 +40,6 @@ public: unsigned long heartbeat_interval_ms; unsigned long cluster_listen_interval_ms; unsigned long status_update_interval_ms; - unsigned long node_update_broadcast_interval_ms; - unsigned long print_interval_ms; // Node Status Thresholds unsigned long node_active_threshold_ms; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 8a3c996..0bbf5f2 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -3,8 +3,8 @@ #include "spore/util/Logging.h" ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) { - // Register callback for node_discovered event - ctx.on("node_discovered", [this](void* data) { + // Register callback for node/discovered event + ctx.on("node/discovered", [this](void* data) { NodeInfo* node = static_cast(data); this->addOrUpdateNode(node->hostname, node->ip); }); @@ -26,11 +26,16 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx this->ctx.udp->endPacket(); }); - // Handler for node update broadcasts: services fire 'cluster/node_update' when their node info changes - ctx.on("cluster/node_update", [this](void* data) { + // Handler for node update broadcasts: services fire 'cluster/node/update' when their node info changes + ctx.on("cluster/node/update", [this](void* data) { // Trigger immediate NODE_UPDATE broadcast when node info changes broadcastNodeUpdate(); }); + + // Handler for memberlist changes: print memberlist when it changes + ctx.on("cluster/memberlist/changed", [this](void* data) { + printMemberList(); + }); // Register tasks registerTasks(); initMessageHandlers(); @@ -39,9 +44,7 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx void ClusterManager::registerTasks() { taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); - taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); - taskManager.registerTask("node_update_broadcast", ctx.config.node_update_broadcast_interval_ms, [this]() { broadcastNodeUpdate(); }); LOG_INFO("ClusterManager", "Registered all cluster tasks"); } @@ -183,21 +186,35 @@ void ClusterManager::onNodeUpdate(const char* msg) { } // Update the responding node's information + bool hostnameChanged = false; if (doc["hostname"].is()) { - respondingNode->hostname = doc["hostname"].as(); + String newHostname = doc["hostname"].as(); + if (respondingNode->hostname != newHostname) { + respondingNode->hostname = newHostname; + hostnameChanged = true; + } } + if (doc["uptime"].is()) { respondingNode->uptime = doc["uptime"]; } // Update labels if provided + bool labelsChanged = false; if (doc["labels"].is()) { - respondingNode->labels.clear(); + // Check if labels actually changed JsonObject labelsObj = doc["labels"].as(); + std::map newLabels; for (JsonPair kvp : labelsObj) { const char* key = kvp.key().c_str(); const char* value = labelsObj[kvp.key()]; - respondingNode->labels[key] = String(value); + newLabels[key] = String(value); + } + + // Compare with existing labels + if (newLabels != respondingNode->labels) { + labelsChanged = true; + respondingNode->labels = newLabels; } } @@ -209,7 +226,18 @@ void ClusterManager::onNodeUpdate(const char* msg) { respondingNode->latency = latency; } - LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + " | latency: " + String(latency > 0 ? String(latency) + "ms" : "not calculated")); + // Check if any fields changed that require broadcasting + bool nodeInfoChanged = hostnameChanged || labelsChanged; + + if (nodeInfoChanged) { + // Fire cluster/node/update event to trigger broadcast + ctx.fire("cluster/node/update", nullptr); + } + + LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + + " | hostname: " + (hostnameChanged ? "changed" : "unchanged") + + " | labels: " + (labelsChanged ? "changed" : "unchanged") + + " | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated")); } else { LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString()); } @@ -318,27 +346,36 @@ void ClusterManager::onRawMessage(const char* msg) { void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { auto& memberList = *ctx.memberList; + bool memberlistChanged = false; // O(1) lookup instead of O(n) search auto it = memberList.find(nodeHost); if (it != memberList.end()) { // Update existing node - preserve all existing field values - it->second.ip = nodeIP; + if (it->second.ip != nodeIP) { + it->second.ip = nodeIP; + memberlistChanged = true; + } it->second.lastSeen = millis(); // Note: Other fields like latency, uptime, labels, etc. are preserved //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task - return; + } else { + // Add new node + NodeInfo newNode; + newNode.hostname = nodeHost; + newNode.ip = nodeIP; + newNode.lastSeen = millis(); + updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); + memberList[nodeHost] = newNode; + memberlistChanged = true; + LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); + //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task } - // Add new node - NodeInfo newNode; - newNode.hostname = nodeHost; - newNode.ip = nodeIP; - newNode.lastSeen = millis(); - updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); - memberList[nodeHost] = newNode; - LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); - //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task + // Fire event if memberlist changed + if (memberlistChanged) { + ctx.fire("cluster/memberlist/changed", nullptr); + } } // unused http client to fetch complete node info @@ -522,17 +559,24 @@ void ClusterManager::updateAllNodeStatuses() { void ClusterManager::removeDeadNodes() { auto& memberList = *ctx.memberList; unsigned long now = millis(); - + bool memberlistChanged = false; + // Use iterator to safely remove elements from map for (auto it = memberList.begin(); it != memberList.end(); ) { unsigned long diff = now - it->second.lastSeen; if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) { LOG_INFO("Cluster", "Removing node: " + it->second.hostname); it = memberList.erase(it); + memberlistChanged = true; } else { ++it; } } + + // Fire event if memberlist changed + if (memberlistChanged) { + ctx.fire("cluster/memberlist/changed", nullptr); + } } void ClusterManager::printMemberList() { diff --git a/src/spore/core/NetworkManager.cpp b/src/spore/core/NetworkManager.cpp index c70b223..7a0729c 100644 --- a/src/spore/core/NetworkManager.cpp +++ b/src/spore/core/NetworkManager.cpp @@ -128,5 +128,5 @@ void NetworkManager::setupWiFi() { } // Notify listeners that the node is (re)discovered - ctx.fire("node_discovered", &ctx.self); + ctx.fire("node/discovered", &ctx.self); } diff --git a/src/spore/services/NodeService.cpp b/src/spore/services/NodeService.cpp index 451cb72..473fcdb 100644 --- a/src/spore/services/NodeService.cpp +++ b/src/spore/services/NodeService.cpp @@ -258,8 +258,6 @@ void NodeService::handleGetConfigRequest(AsyncWebServerRequest* request) { clusterObj["heartbeat_interval_ms"] = ctx.config.heartbeat_interval_ms; clusterObj["cluster_listen_interval_ms"] = ctx.config.cluster_listen_interval_ms; clusterObj["status_update_interval_ms"] = ctx.config.status_update_interval_ms; - clusterObj["node_update_broadcast_interval_ms"] = ctx.config.node_update_broadcast_interval_ms; - clusterObj["print_interval_ms"] = ctx.config.print_interval_ms; // Node Status Thresholds JsonObject thresholdsObj = doc["thresholds"].to(); diff --git a/src/spore/types/Config.cpp b/src/spore/types/Config.cpp index c115e24..4213e76 100644 --- a/src/spore/types/Config.cpp +++ b/src/spore/types/Config.cpp @@ -35,8 +35,6 @@ void Config::setDefaults() { cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS; status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS; - node_update_broadcast_interval_ms = DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS; - print_interval_ms = DEFAULT_PRINT_INTERVAL_MS; // Node Status Thresholds node_active_threshold_ms = DEFAULT_NODE_ACTIVE_THRESHOLD_MS; @@ -88,8 +86,6 @@ bool Config::saveToFile(const String& filename) { doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms; doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms; doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms; - doc["cluster"]["node_update_broadcast_interval_ms"] = node_update_broadcast_interval_ms; - doc["cluster"]["print_interval_ms"] = print_interval_ms; // Node Status Thresholds doc["thresholds"]["node_active_threshold_ms"] = node_active_threshold_ms; @@ -167,8 +163,6 @@ bool Config::loadFromFile(const String& filename) { heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS; cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS; - node_update_broadcast_interval_ms = doc["cluster"]["node_update_broadcast_interval_ms"] | DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS; - print_interval_ms = doc["cluster"]["print_interval_ms"] | DEFAULT_PRINT_INTERVAL_MS; // Load Node Status Thresholds with defaults node_active_threshold_ms = doc["thresholds"]["node_active_threshold_ms"] | DEFAULT_NODE_ACTIVE_THRESHOLD_MS;