From 37a68e26d8b44b7372629a128799a8d58bde91ae Mon Sep 17 00:00:00 2001 From: 0x1d Date: Mon, 20 Oct 2025 21:21:02 +0200 Subject: [PATCH 1/4] refactor: remove unused and obsolet stuff --- include/spore/core/ClusterManager.h | 1 - include/spore/services/MonitoringService.h | 6 -- src/spore/core/ClusterManager.cpp | 111 --------------------- src/spore/services/MonitoringService.cpp | 19 ---- src/spore/services/NodeService.cpp | 2 +- 5 files changed, 1 insertion(+), 138 deletions(-) diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index 5c7ff86..c93fbb2 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -20,7 +20,6 @@ public: void removeDeadNodes(); void printMemberList(); const std::map& getMemberList() const { return *ctx.memberList; } - void fetchNodeInfo(const IPAddress& ip); void updateLocalNodeResources(); void heartbeatTaskCallback(); void updateAllMembersInfoTaskCallback(); diff --git a/include/spore/services/MonitoringService.h b/include/spore/services/MonitoringService.h index 8c2901a..91f62f8 100644 --- a/include/spore/services/MonitoringService.h +++ b/include/spore/services/MonitoringService.h @@ -15,17 +15,12 @@ public: // CPU information float currentCpuUsage; float averageCpuUsage; - float maxCpuUsage; - float minCpuUsage; unsigned long measurementCount; bool isMeasuring; // Memory information size_t freeHeap; size_t totalHeap; - size_t minFreeHeap; - size_t maxAllocHeap; - size_t heapFragmentation; // Filesystem information size_t totalBytes; @@ -45,7 +40,6 @@ private: void handleResourcesRequest(AsyncWebServerRequest* request); // Helper methods - size_t calculateHeapFragmentation() const; void getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const; CpuUsage& cpuUsage; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 0bbf5f2..1d2360c 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -357,8 +357,6 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { memberlistChanged = true; } it->second.lastSeen = millis(); - // Note: Other fields like latency, uptime, labels, etc. are preserved - //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task } else { // Add new node NodeInfo newNode; @@ -369,7 +367,6 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { memberList[nodeHost] = newNode; memberlistChanged = true; LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); - //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task } // Fire event if memberlist changed @@ -378,114 +375,6 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { } } -// unused http client to fetch complete node info -void ClusterManager::fetchNodeInfo(const IPAddress& ip) { - if(ip == ctx.localIP) { - LOG_DEBUG("Cluster", "Skipping fetch for local node"); - return; - } - - unsigned long requestStart = millis(); - HTTPClient http; - WiFiClient client; - String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS; - - // Use RAII pattern to ensure http.end() is always called - bool httpInitialized = false; - bool success = false; - - httpInitialized = http.begin(client, url); - if (!httpInitialized) { - LOG_ERROR("Cluster", "Failed to initialize HTTP client for " + ip.toString()); - return; - } - - // Set timeout to prevent hanging - http.setTimeout(5000); // 5 second timeout - - int httpCode = http.GET(); - unsigned long requestEnd = millis(); - unsigned long requestDuration = requestEnd - requestStart; - - if (httpCode == 200) { - String payload = http.getString(); - - // Use stack-allocated JsonDocument with proper cleanup - JsonDocument doc; - DeserializationError err = deserializeJson(doc, payload); - - if (!err) { - auto& memberList = *ctx.memberList; - // Still need to iterate since we're searching by IP, not hostname - for (auto& pair : memberList) { - NodeInfo& node = pair.second; - if (node.ip == ip) { - // Update resources efficiently - node.resources.freeHeap = doc["freeHeap"]; - node.resources.chipId = doc["chipId"]; - node.resources.sdkVersion = (const char*)doc["sdkVersion"]; - node.resources.cpuFreqMHz = doc["cpuFreqMHz"]; - node.resources.flashChipSize = doc["flashChipSize"]; - node.status = NodeInfo::ACTIVE; - node.latency = requestDuration; - node.lastSeen = millis(); - - // Clear and rebuild endpoints efficiently - node.endpoints.clear(); - node.endpoints.reserve(10); // Pre-allocate to avoid reallocations - - if (doc["api"].is()) { - JsonArray apiArr = doc["api"].as(); - for (JsonObject apiObj : apiArr) { - // Use const char* to avoid String copies - const char* uri = apiObj["uri"]; - int method = apiObj["method"]; - - // Create basic EndpointInfo without params for cluster nodes - EndpointInfo endpoint; - endpoint.uri = uri; // String assignment is more efficient than construction - endpoint.method = method; - endpoint.isLocal = false; - endpoint.serviceName = "remote"; - node.endpoints.push_back(std::move(endpoint)); - } - } - - // Parse labels efficiently - node.labels.clear(); - if (doc["labels"].is()) { - JsonObject labelsObj = doc["labels"].as(); - for (JsonPair kvp : labelsObj) { - // Use const char* to avoid String copies - const char* key = kvp.key().c_str(); - const char* value = labelsObj[kvp.key()]; - node.labels[key] = value; - } - } - - LOG_DEBUG("Cluster", "Fetched info for node: " + node.hostname + " @ " + ip.toString()); - success = true; - break; - } - } - } else { - LOG_ERROR("Cluster", "JSON parse error for node @ " + ip.toString() + ": " + String(err.c_str())); - } - } else { - LOG_ERROR("Cluster", "Failed to fetch info for node @ " + ip.toString() + ", HTTP code: " + String(httpCode)); - } - - // Always ensure HTTP client is properly closed - if (httpInitialized) { - http.end(); - } - - // Log success/failure for debugging - if (!success) { - LOG_DEBUG("Cluster", "Failed to update node info for " + ip.toString()); - } -} - void ClusterManager::heartbeatTaskCallback() { auto& memberList = *ctx.memberList; auto it = memberList.find(ctx.hostname); diff --git a/src/spore/services/MonitoringService.cpp b/src/spore/services/MonitoringService.cpp index 53a62c2..c6a466b 100644 --- a/src/spore/services/MonitoringService.cpp +++ b/src/spore/services/MonitoringService.cpp @@ -25,17 +25,12 @@ MonitoringService::SystemResources MonitoringService::getSystemResources() const // CPU information resources.currentCpuUsage = cpuUsage.getCpuUsage(); resources.averageCpuUsage = cpuUsage.getAverageCpuUsage(); - resources.maxCpuUsage = cpuUsage.getMaxCpuUsage(); - resources.minCpuUsage = cpuUsage.getMinCpuUsage(); resources.measurementCount = cpuUsage.getMeasurementCount(); resources.isMeasuring = cpuUsage.isMeasuring(); // Memory information - ESP8266 compatible resources.freeHeap = ESP.getFreeHeap(); resources.totalHeap = 81920; // ESP8266 has ~80KB RAM - resources.minFreeHeap = 0; // Not available on ESP8266 - resources.maxAllocHeap = 0; // Not available on ESP8266 - resources.heapFragmentation = calculateHeapFragmentation(); // Filesystem information getFilesystemInfo(resources.totalBytes, resources.usedBytes); @@ -59,8 +54,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) { JsonObject cpu = doc["cpu"].to(); cpu["current_usage"] = resources.currentCpuUsage; cpu["average_usage"] = resources.averageCpuUsage; - cpu["max_usage"] = resources.maxCpuUsage; - cpu["min_usage"] = resources.minCpuUsage; cpu["measurement_count"] = resources.measurementCount; cpu["is_measuring"] = resources.isMeasuring; @@ -68,9 +61,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) { JsonObject memory = doc["memory"].to(); memory["free_heap"] = resources.freeHeap; memory["total_heap"] = resources.totalHeap; - memory["min_free_heap"] = resources.minFreeHeap; - memory["max_alloc_heap"] = resources.maxAllocHeap; - memory["heap_fragmentation"] = resources.heapFragmentation; memory["heap_usage_percent"] = resources.totalHeap > 0 ? (float)(resources.totalHeap - resources.freeHeap) / (float)resources.totalHeap * 100.0f : 0.0f; @@ -94,15 +84,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) { request->send(200, "application/json", json); } -size_t MonitoringService::calculateHeapFragmentation() const { - size_t freeHeap = ESP.getFreeHeap(); - size_t maxAllocHeap = 0; // Not available on ESP8266 - - if (maxAllocHeap == 0) return 0; - - // Calculate fragmentation as percentage of free heap that can't be allocated in one block - return (freeHeap - maxAllocHeap) * 100 / freeHeap; -} void MonitoringService::getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const { totalBytes = 0; diff --git a/src/spore/services/NodeService.cpp b/src/spore/services/NodeService.cpp index 473fcdb..5e2afa8 100644 --- a/src/spore/services/NodeService.cpp +++ b/src/spore/services/NodeService.cpp @@ -116,7 +116,7 @@ void NodeService::handleUpdateUpload(AsyncWebServerRequest* request, const Strin LOG_ERROR("OTA", "Update failed: not enough space"); Update.printError(Serial); AsyncWebServerResponse* response = request->beginResponse(500, "application/json", - "{\"status\": \"FAIL\"}"); + "{\"status\": \"FAIL\", \"message\": \"Update failed: not enough space\"}"); response->addHeader("Connection", "close"); request->send(response); return; From daae29dd3fc6bd1dc5bae24fac07ea68954e7b6c Mon Sep 17 00:00:00 2001 From: 0x1d Date: Mon, 20 Oct 2025 21:35:08 +0200 Subject: [PATCH 2/4] refactor: update local node --- include/spore/core/ClusterManager.h | 2 +- src/spore/core/ClusterManager.cpp | 41 +++++++++++++---------------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index c93fbb2..3b0fbb2 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -20,7 +20,7 @@ public: void removeDeadNodes(); void printMemberList(); const std::map& getMemberList() const { return *ctx.memberList; } - void updateLocalNodeResources(); + void updateLocalNodeResources(NodeInfo& node); void heartbeatTaskCallback(); void updateAllMembersInfoTaskCallback(); void broadcastNodeUpdate(); diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 1d2360c..bbdf02c 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -380,10 +380,7 @@ void ClusterManager::heartbeatTaskCallback() { auto it = memberList.find(ctx.hostname); if (it != memberList.end()) { NodeInfo& node = it->second; - node.lastSeen = millis(); - node.status = NodeInfo::ACTIVE; - node.uptime = millis(); // Update uptime - updateLocalNodeResources(); + updateLocalNodeResources(node); addOrUpdateNode(ctx.hostname, ctx.localIP); } @@ -481,23 +478,23 @@ void ClusterManager::printMemberList() { } } -void ClusterManager::updateLocalNodeResources() { - auto& memberList = *ctx.memberList; - auto it = memberList.find(ctx.hostname); - if (it != memberList.end()) { - NodeInfo& node = it->second; - uint32_t freeHeap = ESP.getFreeHeap(); - node.resources.freeHeap = freeHeap; - node.resources.chipId = ESP.getChipId(); - node.resources.sdkVersion = String(ESP.getSdkVersion()); - node.resources.cpuFreqMHz = ESP.getCpuFreqMHz(); - node.resources.flashChipSize = ESP.getFlashChipSize(); - - // Log memory warnings if heap is getting low - if (freeHeap < ctx.config.low_memory_threshold_bytes) { - LOG_WARN("Cluster", "Low memory warning: " + String(freeHeap) + " bytes free"); - } else if (freeHeap < ctx.config.critical_memory_threshold_bytes) { - LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free"); - } +void ClusterManager::updateLocalNodeResources(NodeInfo& node) { + // Update node status and timing + node.lastSeen = millis(); + node.status = NodeInfo::ACTIVE; + node.uptime = millis(); + + uint32_t freeHeap = ESP.getFreeHeap(); + node.resources.freeHeap = freeHeap; + node.resources.chipId = ESP.getChipId(); + node.resources.sdkVersion = String(ESP.getSdkVersion()); + node.resources.cpuFreqMHz = ESP.getCpuFreqMHz(); + node.resources.flashChipSize = ESP.getFlashChipSize(); + + // Log memory warnings if heap is getting low + if (freeHeap < ctx.config.low_memory_threshold_bytes) { + LOG_WARN("Cluster", "Low memory warning: " + String(freeHeap) + " bytes free"); + } else if (freeHeap < ctx.config.critical_memory_threshold_bytes) { + LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free"); } } From e796375a9f2e8107f64d2ae649062d2aa7996ea1 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Tue, 21 Oct 2025 09:55:45 +0200 Subject: [PATCH 3/4] feat: memberlist optimization --- include/spore/core/ClusterManager.h | 2 +- include/spore/core/Memberlist.h | 133 +++++++++++++++ include/spore/core/NodeContext.h | 6 +- src/spore/core/ApiServer.cpp | 11 +- src/spore/core/ClusterManager.cpp | 229 ++++++++++++-------------- src/spore/core/Memberlist.cpp | 114 +++++++++++++ src/spore/core/NetworkManager.cpp | 9 +- src/spore/core/NodeContext.cpp | 4 +- src/spore/services/ClusterService.cpp | 5 +- src/spore/services/NodeService.cpp | 36 ++-- 10 files changed, 388 insertions(+), 161 deletions(-) create mode 100644 include/spore/core/Memberlist.h create mode 100644 src/spore/core/Memberlist.cpp diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index 3b0fbb2..c57dc74 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -19,7 +19,7 @@ public: void updateAllNodeStatuses(); void removeDeadNodes(); void printMemberList(); - const std::map& getMemberList() const { return *ctx.memberList; } + size_t getMemberCount() const { return ctx.memberList->getMemberCount(); } void updateLocalNodeResources(NodeInfo& node); void heartbeatTaskCallback(); void updateAllMembersInfoTaskCallback(); diff --git a/include/spore/core/Memberlist.h b/include/spore/core/Memberlist.h new file mode 100644 index 0000000..aa21b77 --- /dev/null +++ b/include/spore/core/Memberlist.h @@ -0,0 +1,133 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "spore/types/NodeInfo.h" + +/** + * @brief Manages the list of cluster members. + * + * The Memberlist class maintains a collection of cluster members, where each member + * is identified by its IP address and associated with a NodeInfo object. It provides + * methods to add, update, and remove members, as well as handle node status changes + * (stale and dead nodes). + */ +class Memberlist { +public: + /** + * @brief Default constructor. + */ + Memberlist(); + + /** + * @brief Destructor. + */ + ~Memberlist(); + + /** + * @brief Adds or updates a member in the list. + * + * If the member already exists, updates its information. Otherwise, adds a new member. + * @param ip The IP address of the member (as string). + * @param node The NodeInfo object containing member details. + * @return True if the member was added or updated, false otherwise. + */ + bool addOrUpdateMember(const std::string& ip, const NodeInfo& node); + + /** + * @brief Adds a new member to the list. + * + * @param ip The IP address of the member (as string). + * @param node The NodeInfo object containing member details. + * @return True if the member was added, false if it already exists. + */ + bool addMember(const std::string& ip, const NodeInfo& node); + + /** + * @brief Updates an existing member in the list. + * + * @param ip The IP address of the member (as string). + * @param node The updated NodeInfo object. + * @return True if the member was updated, false if it doesn't exist. + */ + bool updateMember(const std::string& ip, const NodeInfo& node); + + /** + * @brief Removes a member from the list. + * + * @param ip The IP address of the member to remove (as string). + * @return True if the member was removed, false if it doesn't exist. + */ + bool removeMember(const std::string& ip); + + /** + * @brief Retrieves a member by IP address. + * + * @param ip The IP address of the member (as string). + * @return Optional containing the NodeInfo if found, or std::nullopt if not found. + */ + std::optional getMember(const std::string& ip) const; + + /** + * @brief Iterates over all members and calls the provided callback for each. + * + * @param callback Function to call for each member. Receives (ip, node) as parameters. + */ + void forEachMember(std::function callback) const; + + /** + * @brief Iterates over all members and calls the provided callback for each. + * + * @param callback Function to call for each member. Receives (ip, node) as parameters. + * If callback returns false, iteration stops. + * @return True if all members were processed, false if iteration was stopped early. + */ + bool forEachMemberUntil(std::function callback) const; + + /** + * @brief Gets the number of members in the list. + * + * @return The number of members. + */ + size_t getMemberCount() const; + + /** + * @brief Updates the status of all members based on current time and thresholds. + * + * Marks nodes as stale or dead based on their last seen time. + * @param currentTime The current time in milliseconds. + * @param staleThresholdMs Threshold for marking a node as stale (milliseconds). + * @param deadThresholdMs Threshold for marking a node as dead (milliseconds). + * @param onStatusChange Optional callback fired when a node's status changes. + */ + void updateAllNodeStatuses(unsigned long currentTime, + unsigned long staleThresholdMs, + unsigned long deadThresholdMs, + std::function onStatusChange = nullptr); + + /** + * @brief Removes all dead members from the list. + * + * @return The number of members removed. + */ + size_t removeDeadMembers(); + + /** + * @brief Checks if a member exists in the list. + * + * @param ip The IP address of the member (as string). + * @return True if the member exists, false otherwise. + */ + bool hasMember(const std::string& ip) const; + + /** + * @brief Clears all members from the list. + */ + void clear(); + +private: + std::map m_members; ///< Internal map holding the members. +}; diff --git a/include/spore/core/NodeContext.h b/include/spore/core/NodeContext.h index 2e1e2b4..395d7d2 100644 --- a/include/spore/core/NodeContext.h +++ b/include/spore/core/NodeContext.h @@ -2,12 +2,14 @@ #include #include -#include "spore/types/NodeInfo.h" #include #include #include +#include +#include "spore/types/NodeInfo.h" #include "spore/types/Config.h" #include "spore/types/ApiTypes.h" +#include "spore/core/Memberlist.h" class NodeContext { public: @@ -18,7 +20,7 @@ public: String hostname; IPAddress localIP; NodeInfo self; - std::map* memberList; + std::unique_ptr memberList; ::Config config; std::map constructorLabels; // Labels passed to constructor (not persisted) diff --git a/src/spore/core/ApiServer.cpp b/src/spore/core/ApiServer.cpp index 58148de..b586fe6 100644 --- a/src/spore/core/ApiServer.cpp +++ b/src/spore/core/ApiServer.cpp @@ -23,11 +23,12 @@ void ApiServer::registerEndpoint(const String& uri, int method, endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true}); // Update cluster if needed - if (ctx.memberList && !ctx.memberList->empty()) { - auto it = ctx.memberList->find(ctx.hostname); - if (it != ctx.memberList->end()) { - it->second.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true}); - } + String localIPStr = ctx.localIP.toString(); + auto member = ctx.memberList->getMember(localIPStr.c_str()); + if (member) { + NodeInfo updatedNode = *member; + updatedNode.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true}); + ctx.memberList->updateMember(localIPStr.c_str(), updatedNode); } } diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index bbdf02c..d4bb5ad 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -165,92 +165,93 @@ void ClusterManager::onNodeUpdate(const char* msg) { // but is sent FROM the responding node (ctx.udp->remoteIP()) // We need to find the responding node in the memberlist, not the target node IPAddress respondingNodeIP = ctx.udp->remoteIP(); - auto& memberList = *ctx.memberList; + String respondingIPStr = respondingNodeIP.toString(); // Find the responding node by IP address - NodeInfo* respondingNode = nullptr; - for (auto& pair : memberList) { - if (pair.second.ip == respondingNodeIP) { - respondingNode = &pair.second; - break; - } - } - - if (respondingNode) { - // Calculate latency only if we recently sent a heartbeat (within last 1 second) - unsigned long latency = 0; - unsigned long now = millis(); - if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window - latency = now - lastHeartbeatSentAt; - lastHeartbeatSentAt = 0; // Reset for next calculation - } - - // Update the responding node's information - bool hostnameChanged = false; - if (doc["hostname"].is()) { - String newHostname = doc["hostname"].as(); - if (respondingNode->hostname != newHostname) { - respondingNode->hostname = newHostname; - hostnameChanged = true; - } - } - - if (doc["uptime"].is()) { - respondingNode->uptime = doc["uptime"]; - } - - // Update labels if provided - bool labelsChanged = false; - if (doc["labels"].is()) { - // Check if labels actually changed - JsonObject labelsObj = doc["labels"].as(); - std::map newLabels; - for (JsonPair kvp : labelsObj) { - const char* key = kvp.key().c_str(); - const char* value = labelsObj[kvp.key()]; - newLabels[key] = String(value); - } - - // Compare with existing labels - if (newLabels != respondingNode->labels) { - labelsChanged = true; - respondingNode->labels = newLabels; - } - } - - respondingNode->lastSeen = now; - respondingNode->status = NodeInfo::ACTIVE; - - // Update latency if we calculated it (preserve existing value if not) - if (latency > 0) { - respondingNode->latency = latency; - } - - // Check if any fields changed that require broadcasting - bool nodeInfoChanged = hostnameChanged || labelsChanged; - - if (nodeInfoChanged) { - // Fire cluster/node/update event to trigger broadcast - ctx.fire("cluster/node/update", nullptr); - } - - LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + - " | hostname: " + (hostnameChanged ? "changed" : "unchanged") + - " | labels: " + (labelsChanged ? "changed" : "unchanged") + - " | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated")); - } else { + auto respondingMember = ctx.memberList->getMember(respondingIPStr.c_str()); + if (!respondingMember) { LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString()); + return; } + + // Calculate latency only if we recently sent a heartbeat (within last 1 second) + unsigned long latency = 0; + unsigned long now = millis(); + if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window + latency = now - lastHeartbeatSentAt; + lastHeartbeatSentAt = 0; // Reset for next calculation + } + + // Create updated node info + NodeInfo updatedNode = *respondingMember; + bool hostnameChanged = false; + bool labelsChanged = false; + + // Update hostname if provided + if (doc["hostname"].is()) { + String newHostname = doc["hostname"].as(); + if (updatedNode.hostname != newHostname) { + updatedNode.hostname = newHostname; + hostnameChanged = true; + } + } + + // Update uptime if provided + if (doc["uptime"].is()) { + updatedNode.uptime = doc["uptime"]; + } + + // Update labels if provided + if (doc["labels"].is()) { + JsonObject labelsObj = doc["labels"].as(); + std::map newLabels; + for (JsonPair kvp : labelsObj) { + const char* key = kvp.key().c_str(); + const char* value = labelsObj[kvp.key()]; + newLabels[key] = String(value); + } + + // Compare with existing labels + if (newLabels != updatedNode.labels) { + labelsChanged = true; + updatedNode.labels = newLabels; + } + } + + // Update timing and status + updatedNode.lastSeen = now; + updatedNode.status = NodeInfo::ACTIVE; + + // Update latency if we calculated it (preserve existing value if not) + if (latency > 0) { + updatedNode.latency = latency; + } + + // Persist the updated node info to the memberlist + ctx.memberList->updateMember(respondingIPStr.c_str(), updatedNode); + + // Check if any fields changed that require broadcasting + bool nodeInfoChanged = hostnameChanged || labelsChanged; + + if (nodeInfoChanged) { + // Fire cluster/node/update event to trigger broadcast + ctx.fire("cluster/node/update", nullptr); + } + + LOG_DEBUG("Cluster", String("Updated responding node ") + updatedNode.hostname + " @ " + respondingNodeIP.toString() + + " | hostname: " + (hostnameChanged ? "changed" : "unchanged") + + " | labels: " + (labelsChanged ? "changed" : "unchanged") + + " | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated")); } void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) { JsonDocument doc; // Get our node info for the response (we're the responding node) - auto& memberList = *ctx.memberList; - auto it = memberList.find(ctx.hostname); - if (it != memberList.end()) { - const NodeInfo& node = it->second; + String localIPStr = ctx.localIP.toString(); + auto member = ctx.memberList->getMember(localIPStr.c_str()); + if (member) { + const NodeInfo& node = *member; // Response contains info about ourselves (the responding node) doc["hostname"] = node.hostname; @@ -265,7 +266,7 @@ void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& } } } else { - // Fallback to basic info + // Fallback to basic info if not in memberlist doc["hostname"] = ctx.hostname; doc["ip"] = ctx.localIP.toString(); doc["uptime"] = millis(); @@ -345,18 +346,20 @@ void ClusterManager::onRawMessage(const char* msg) { } void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { - auto& memberList = *ctx.memberList; bool memberlistChanged = false; + String ipStr = nodeIP.toString(); - // O(1) lookup instead of O(n) search - auto it = memberList.find(nodeHost); - if (it != memberList.end()) { + // Check if member exists + auto existingMember = ctx.memberList->getMember(ipStr.c_str()); + if (existingMember) { // Update existing node - preserve all existing field values - if (it->second.ip != nodeIP) { - it->second.ip = nodeIP; + NodeInfo updatedNode = *existingMember; + if (updatedNode.ip != nodeIP) { + updatedNode.ip = nodeIP; memberlistChanged = true; } - it->second.lastSeen = millis(); + updatedNode.lastSeen = millis(); + ctx.memberList->updateMember(ipStr.c_str(), updatedNode); } else { // Add new node NodeInfo newNode; @@ -364,7 +367,7 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { newNode.ip = nodeIP; newNode.lastSeen = millis(); updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); - memberList[nodeHost] = newNode; + ctx.memberList->addMember(ipStr.c_str(), newNode); memberlistChanged = true; LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); } @@ -376,12 +379,14 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { } void ClusterManager::heartbeatTaskCallback() { - auto& memberList = *ctx.memberList; - auto it = memberList.find(ctx.hostname); - if (it != memberList.end()) { - NodeInfo& node = it->second; + // Update local node resources and lastSeen since we're actively sending heartbeats + String localIPStr = ctx.localIP.toString(); + auto member = ctx.memberList->getMember(localIPStr.c_str()); + if (member) { + NodeInfo node = *member; updateLocalNodeResources(node); - addOrUpdateNode(ctx.hostname, ctx.localIP); + node.lastSeen = millis(); // Update lastSeen since we're actively participating + ctx.memberList->updateMember(localIPStr.c_str(), node); } // Broadcast heartbeat - peers will respond with NODE_UPDATE @@ -401,13 +406,13 @@ void ClusterManager::updateAllMembersInfoTaskCallback() { void ClusterManager::broadcastNodeUpdate() { // Broadcast our current node info as NODE_UPDATE to all cluster members - auto& memberList = *ctx.memberList; - auto it = memberList.find(ctx.hostname); - if (it == memberList.end()) { + String localIPStr = ctx.localIP.toString(); + auto member = ctx.memberList->getMember(localIPStr.c_str()); + if (!member) { return; } - const NodeInfo& node = it->second; + const NodeInfo& node = *member; JsonDocument doc; doc["hostname"] = node.hostname; @@ -434,48 +439,28 @@ void ClusterManager::broadcastNodeUpdate() { } void ClusterManager::updateAllNodeStatuses() { - auto& memberList = *ctx.memberList; unsigned long now = millis(); - for (auto& pair : memberList) { - NodeInfo& node = pair.second; - updateNodeStatus(node, now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); - } + ctx.memberList->updateAllNodeStatuses(now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); } void ClusterManager::removeDeadNodes() { - auto& memberList = *ctx.memberList; - unsigned long now = millis(); - bool memberlistChanged = false; - - // Use iterator to safely remove elements from map - for (auto it = memberList.begin(); it != memberList.end(); ) { - unsigned long diff = now - it->second.lastSeen; - if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) { - LOG_INFO("Cluster", "Removing node: " + it->second.hostname); - it = memberList.erase(it); - memberlistChanged = true; - } else { - ++it; - } - } - - // Fire event if memberlist changed - if (memberlistChanged) { + size_t removedCount = ctx.memberList->removeDeadMembers(); + if (removedCount > 0) { + LOG_INFO("Cluster", String("Removed ") + removedCount + " dead nodes"); ctx.fire("cluster/memberlist/changed", nullptr); } } void ClusterManager::printMemberList() { - auto& memberList = *ctx.memberList; - if (memberList.empty()) { + size_t count = ctx.memberList->getMemberCount(); + if (count == 0) { LOG_INFO("Cluster", "Member List: empty"); return; } LOG_INFO("Cluster", "Member List:"); - for (const auto& pair : memberList) { - const NodeInfo& node = pair.second; + ctx.memberList->forEachMember([](const std::string& ip, const NodeInfo& node) { LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen)); - } + }); } void ClusterManager::updateLocalNodeResources(NodeInfo& node) { diff --git a/src/spore/core/Memberlist.cpp b/src/spore/core/Memberlist.cpp new file mode 100644 index 0000000..1392421 --- /dev/null +++ b/src/spore/core/Memberlist.cpp @@ -0,0 +1,114 @@ +#include "spore/core/Memberlist.h" +#include + +Memberlist::Memberlist() = default; + +Memberlist::~Memberlist() = default; + +bool Memberlist::addOrUpdateMember(const std::string& ip, const NodeInfo& node) { + auto it = m_members.find(ip); + if (it != m_members.end()) { + // Update existing member + it->second = node; + it->second.lastSeen = millis(); // Update last seen time + return true; + } else { + // Add new member + NodeInfo newNode = node; + newNode.lastSeen = millis(); + m_members[ip] = newNode; + return true; + } +} + +bool Memberlist::addMember(const std::string& ip, const NodeInfo& node) { + if (m_members.find(ip) != m_members.end()) { + return false; // Member already exists + } + NodeInfo newNode = node; + newNode.lastSeen = millis(); + m_members[ip] = newNode; + return true; +} + +bool Memberlist::updateMember(const std::string& ip, const NodeInfo& node) { + auto it = m_members.find(ip); + if (it == m_members.end()) { + return false; // Member doesn't exist + } + it->second = node; + it->second.lastSeen = millis(); // Update last seen time + return true; +} + +bool Memberlist::removeMember(const std::string& ip) { + auto it = m_members.find(ip); + if (it == m_members.end()) { + return false; // Member doesn't exist + } + m_members.erase(it); + return true; +} + +std::optional Memberlist::getMember(const std::string& ip) const { + auto it = m_members.find(ip); + if (it != m_members.end()) { + return it->second; + } + return std::nullopt; +} + +void Memberlist::forEachMember(std::function callback) const { + for (const auto& pair : m_members) { + callback(pair.first, pair.second); + } +} + +bool Memberlist::forEachMemberUntil(std::function callback) const { + for (const auto& pair : m_members) { + if (!callback(pair.first, pair.second)) { + return false; + } + } + return true; +} + +size_t Memberlist::getMemberCount() const { + return m_members.size(); +} + +void Memberlist::updateAllNodeStatuses(unsigned long currentTime, + unsigned long staleThresholdMs, + unsigned long deadThresholdMs, + std::function onStatusChange) { + for (auto& [ip, node] : m_members) { + NodeInfo::Status oldStatus = node.status; + updateNodeStatus(node, currentTime, staleThresholdMs, deadThresholdMs); + + if (oldStatus != node.status && onStatusChange) { + onStatusChange(ip, oldStatus, node.status); + } + } +} + +size_t Memberlist::removeDeadMembers() { + size_t removedCount = 0; + auto it = m_members.begin(); + while (it != m_members.end()) { + if (it->second.status == NodeInfo::Status::DEAD) { + it = m_members.erase(it); + ++removedCount; + } else { + ++it; + } + } + return removedCount; +} + +bool Memberlist::hasMember(const std::string& ip) const { + return m_members.find(ip) != m_members.end(); +} + +void Memberlist::clear() { + m_members.clear(); +} diff --git a/src/spore/core/NetworkManager.cpp b/src/spore/core/NetworkManager.cpp index 7a0729c..ba7e0a8 100644 --- a/src/spore/core/NetworkManager.cpp +++ b/src/spore/core/NetworkManager.cpp @@ -119,13 +119,8 @@ void NetworkManager::setupWiFi() { ctx.self.status = NodeInfo::ACTIVE; // Ensure member list has an entry for this node - auto &memberList = *ctx.memberList; - auto existing = memberList.find(ctx.hostname); - if (existing == memberList.end()) { - memberList[ctx.hostname] = ctx.self; - } else { - existing->second = ctx.self; - } + String localIPStr = ctx.localIP.toString(); + ctx.memberList->addOrUpdateMember(localIPStr.c_str(), ctx.self); // Notify listeners that the node is (re)discovered ctx.fire("node/discovered", &ctx.self); diff --git a/src/spore/core/NodeContext.cpp b/src/spore/core/NodeContext.cpp index 5520994..c1136a9 100644 --- a/src/spore/core/NodeContext.cpp +++ b/src/spore/core/NodeContext.cpp @@ -2,7 +2,7 @@ NodeContext::NodeContext() { udp = new WiFiUDP(); - memberList = new std::map(); + memberList = std::make_unique(); hostname = ""; self.hostname = ""; self.ip = IPAddress(); @@ -19,7 +19,7 @@ NodeContext::NodeContext(std::initializer_list> initia NodeContext::~NodeContext() { delete udp; - delete memberList; + // memberList is a unique_ptr, so no need to delete manually } void NodeContext::on(const std::string& event, EventCallback cb) { diff --git a/src/spore/services/ClusterService.cpp b/src/spore/services/ClusterService.cpp index dd0e224..afaf59a 100644 --- a/src/spore/services/ClusterService.cpp +++ b/src/spore/services/ClusterService.cpp @@ -40,8 +40,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) { JsonDocument doc; JsonArray arr = doc["members"].to(); - for (const auto& pair : *ctx.memberList) { - const NodeInfo& node = pair.second; + ctx.memberList->forEachMember([&arr](const std::string& ip, const NodeInfo& node) { JsonObject obj = arr.add(); obj["hostname"] = node.hostname; obj["ip"] = node.ip.toString(); @@ -56,7 +55,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) { labelsObj[kv.first.c_str()] = kv.second; } } - } + }); String json; serializeJson(doc, json); diff --git a/src/spore/services/NodeService.cpp b/src/spore/services/NodeService.cpp index 5e2afa8..ea16b5e 100644 --- a/src/spore/services/NodeService.cpp +++ b/src/spore/services/NodeService.cpp @@ -74,20 +74,18 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) { doc["flashChipSize"] = ESP.getFlashChipSize(); // Include local node labels if present - if (ctx.memberList) { - auto it = ctx.memberList->find(ctx.hostname); - if (it != ctx.memberList->end()) { - JsonObject labelsObj = doc["labels"].to(); - for (const auto& kv : it->second.labels) { - labelsObj[kv.first.c_str()] = kv.second; - } - } else if (!ctx.self.labels.empty()) { + auto member = ctx.memberList->getMember(ctx.hostname.c_str()); + if (member) { + JsonObject labelsObj = doc["labels"].to(); + for (const auto& kv : member->labels) { + labelsObj[kv.first.c_str()] = kv.second; + } + } else if (!ctx.self.labels.empty()) { JsonObject labelsObj = doc["labels"].to(); for (const auto& kv : ctx.self.labels) { labelsObj[kv.first.c_str()] = kv.second; } } - } String json; serializeJson(doc, json); @@ -216,17 +214,17 @@ void NodeService::handleConfigRequest(AsyncWebServerRequest* request) { // Rebuild self.labels from constructor + config labels ctx.rebuildLabels(); - // TODO think of a better way to update the member list entry for the local node - // Update the member list entry for this node if it exists - if (ctx.memberList) { - auto it = ctx.memberList->find(ctx.hostname); - if (it != ctx.memberList->end()) { - // Update the labels in the member list entry - it->second.labels.clear(); - for (const auto& kv : ctx.self.labels) { - it->second.labels[kv.first] = kv.second; - } + // Update the member list entry for the local node if it exists + String localIPStr = ctx.localIP.toString(); + auto member = ctx.memberList->getMember(localIPStr.c_str()); + if (member) { + // Update the labels in the member list entry + NodeInfo updatedNode = *member; + updatedNode.labels.clear(); + for (const auto& kv : ctx.self.labels) { + updatedNode.labels[kv.first] = kv.second; } + ctx.memberList->updateMember(localIPStr.c_str(), updatedNode); } // Save config to file From 7f406261877cd349da4528832097e3e1d4c97fdc Mon Sep 17 00:00:00 2001 From: 0x1d Date: Tue, 21 Oct 2025 11:19:12 +0200 Subject: [PATCH 4/4] feat: improve local node initalization --- src/spore/core/ClusterManager.cpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index d4bb5ad..9dc3121 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -1,9 +1,10 @@ #include "spore/core/ClusterManager.h" #include "spore/internal/Globals.h" #include "spore/util/Logging.h" +#include "spore/types/NodeInfo.h" ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) { - // Register callback for node/discovered event + // Register callback for node/discovered event - this fires when network is ready ctx.on("node/discovered", [this](void* data) { NodeInfo* node = static_cast(data); this->addOrUpdateNode(node->hostname, node->ip); @@ -367,6 +368,16 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { newNode.ip = nodeIP; newNode.lastSeen = millis(); updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); + + // Initialize static resources if this is the local node being added for the first time + if (nodeIP == ctx.localIP && nodeHost == ctx.hostname) { + newNode.resources.chipId = ESP.getChipId(); + newNode.resources.sdkVersion = String(ESP.getSdkVersion()); + newNode.resources.cpuFreqMHz = ESP.getCpuFreqMHz(); + newNode.resources.flashChipSize = ESP.getFlashChipSize(); + LOG_DEBUG("Cluster", "Initialized static resources for local node"); + } + ctx.memberList->addMember(ipStr.c_str(), newNode); memberlistChanged = true; LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); @@ -469,12 +480,9 @@ void ClusterManager::updateLocalNodeResources(NodeInfo& node) { node.status = NodeInfo::ACTIVE; node.uptime = millis(); + // Update dynamic resources (always updated) uint32_t freeHeap = ESP.getFreeHeap(); node.resources.freeHeap = freeHeap; - node.resources.chipId = ESP.getChipId(); - node.resources.sdkVersion = String(ESP.getSdkVersion()); - node.resources.cpuFreqMHz = ESP.getCpuFreqMHz(); - node.resources.flashChipSize = ESP.getFlashChipSize(); // Log memory warnings if heap is getting low if (freeHeap < ctx.config.low_memory_threshold_bytes) {