Merge pull request 'refactoring/firmware-optimizations' (#16) from refactoring/firmware-optimizations into main

Reviewed-on: #16
This commit is contained in:
2025-10-21 13:51:03 +02:00
12 changed files with 418 additions and 323 deletions

View File

@@ -19,9 +19,8 @@ public:
void updateAllNodeStatuses();
void removeDeadNodes();
void printMemberList();
const std::map<String, NodeInfo>& getMemberList() const { return *ctx.memberList; }
void fetchNodeInfo(const IPAddress& ip);
void updateLocalNodeResources();
size_t getMemberCount() const { return ctx.memberList->getMemberCount(); }
void updateLocalNodeResources(NodeInfo& node);
void heartbeatTaskCallback();
void updateAllMembersInfoTaskCallback();
void broadcastNodeUpdate();

View File

@@ -0,0 +1,133 @@
#pragma once
#include <Arduino.h>
#include <map>
#include <string>
#include <optional>
#include <functional>
#include "spore/types/NodeInfo.h"
/**
* @brief Manages the list of cluster members.
*
* The Memberlist class maintains a collection of cluster members, where each member
* is identified by its IP address and associated with a NodeInfo object. It provides
* methods to add, update, and remove members, as well as handle node status changes
* (stale and dead nodes).
*/
class Memberlist {
public:
/**
* @brief Default constructor.
*/
Memberlist();
/**
* @brief Destructor.
*/
~Memberlist();
/**
* @brief Adds or updates a member in the list.
*
* If the member already exists, updates its information. Otherwise, adds a new member.
* @param ip The IP address of the member (as string).
* @param node The NodeInfo object containing member details.
* @return True if the member was added or updated, false otherwise.
*/
bool addOrUpdateMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Adds a new member to the list.
*
* @param ip The IP address of the member (as string).
* @param node The NodeInfo object containing member details.
* @return True if the member was added, false if it already exists.
*/
bool addMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Updates an existing member in the list.
*
* @param ip The IP address of the member (as string).
* @param node The updated NodeInfo object.
* @return True if the member was updated, false if it doesn't exist.
*/
bool updateMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Removes a member from the list.
*
* @param ip The IP address of the member to remove (as string).
* @return True if the member was removed, false if it doesn't exist.
*/
bool removeMember(const std::string& ip);
/**
* @brief Retrieves a member by IP address.
*
* @param ip The IP address of the member (as string).
* @return Optional containing the NodeInfo if found, or std::nullopt if not found.
*/
std::optional<NodeInfo> getMember(const std::string& ip) const;
/**
* @brief Iterates over all members and calls the provided callback for each.
*
* @param callback Function to call for each member. Receives (ip, node) as parameters.
*/
void forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const;
/**
* @brief Iterates over all members and calls the provided callback for each.
*
* @param callback Function to call for each member. Receives (ip, node) as parameters.
* If callback returns false, iteration stops.
* @return True if all members were processed, false if iteration was stopped early.
*/
bool forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const;
/**
* @brief Gets the number of members in the list.
*
* @return The number of members.
*/
size_t getMemberCount() const;
/**
* @brief Updates the status of all members based on current time and thresholds.
*
* Marks nodes as stale or dead based on their last seen time.
* @param currentTime The current time in milliseconds.
* @param staleThresholdMs Threshold for marking a node as stale (milliseconds).
* @param deadThresholdMs Threshold for marking a node as dead (milliseconds).
* @param onStatusChange Optional callback fired when a node's status changes.
*/
void updateAllNodeStatuses(unsigned long currentTime,
unsigned long staleThresholdMs,
unsigned long deadThresholdMs,
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange = nullptr);
/**
* @brief Removes all dead members from the list.
*
* @return The number of members removed.
*/
size_t removeDeadMembers();
/**
* @brief Checks if a member exists in the list.
*
* @param ip The IP address of the member (as string).
* @return True if the member exists, false otherwise.
*/
bool hasMember(const std::string& ip) const;
/**
* @brief Clears all members from the list.
*/
void clear();
private:
std::map<std::string, NodeInfo> m_members; ///< Internal map holding the members.
};

View File

@@ -2,12 +2,14 @@
#include <WiFiUdp.h>
#include <map>
#include "spore/types/NodeInfo.h"
#include <functional>
#include <string>
#include <initializer_list>
#include <memory>
#include "spore/types/NodeInfo.h"
#include "spore/types/Config.h"
#include "spore/types/ApiTypes.h"
#include "spore/core/Memberlist.h"
class NodeContext {
public:
@@ -18,7 +20,7 @@ public:
String hostname;
IPAddress localIP;
NodeInfo self;
std::map<String, NodeInfo>* memberList;
std::unique_ptr<Memberlist> memberList;
::Config config;
std::map<String, String> constructorLabels; // Labels passed to constructor (not persisted)

View File

@@ -15,17 +15,12 @@ public:
// CPU information
float currentCpuUsage;
float averageCpuUsage;
float maxCpuUsage;
float minCpuUsage;
unsigned long measurementCount;
bool isMeasuring;
// Memory information
size_t freeHeap;
size_t totalHeap;
size_t minFreeHeap;
size_t maxAllocHeap;
size_t heapFragmentation;
// Filesystem information
size_t totalBytes;
@@ -45,7 +40,6 @@ private:
void handleResourcesRequest(AsyncWebServerRequest* request);
// Helper methods
size_t calculateHeapFragmentation() const;
void getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const;
CpuUsage& cpuUsage;

View File

@@ -23,11 +23,12 @@ void ApiServer::registerEndpoint(const String& uri, int method,
endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
// Update cluster if needed
if (ctx.memberList && !ctx.memberList->empty()) {
auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
it->second.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
}
String localIPStr = ctx.localIP.toString();
auto member = ctx.memberList->getMember(localIPStr.c_str());
if (member) {
NodeInfo updatedNode = *member;
updatedNode.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
}
}

View File

@@ -1,9 +1,10 @@
#include "spore/core/ClusterManager.h"
#include "spore/internal/Globals.h"
#include "spore/util/Logging.h"
#include "spore/types/NodeInfo.h"
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
// Register callback for node/discovered event
// Register callback for node/discovered event - this fires when network is ready
ctx.on("node/discovered", [this](void* data) {
NodeInfo* node = static_cast<NodeInfo*>(data);
this->addOrUpdateNode(node->hostname, node->ip);
@@ -165,92 +166,93 @@ void ClusterManager::onNodeUpdate(const char* msg) {
// but is sent FROM the responding node (ctx.udp->remoteIP())
// We need to find the responding node in the memberlist, not the target node
IPAddress respondingNodeIP = ctx.udp->remoteIP();
auto& memberList = *ctx.memberList;
String respondingIPStr = respondingNodeIP.toString();
// Find the responding node by IP address
NodeInfo* respondingNode = nullptr;
for (auto& pair : memberList) {
if (pair.second.ip == respondingNodeIP) {
respondingNode = &pair.second;
break;
}
}
if (respondingNode) {
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
unsigned long latency = 0;
unsigned long now = millis();
if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window
latency = now - lastHeartbeatSentAt;
lastHeartbeatSentAt = 0; // Reset for next calculation
}
// Update the responding node's information
bool hostnameChanged = false;
if (doc["hostname"].is<const char*>()) {
String newHostname = doc["hostname"].as<const char*>();
if (respondingNode->hostname != newHostname) {
respondingNode->hostname = newHostname;
hostnameChanged = true;
}
}
if (doc["uptime"].is<unsigned long>()) {
respondingNode->uptime = doc["uptime"];
}
// Update labels if provided
bool labelsChanged = false;
if (doc["labels"].is<JsonObject>()) {
// Check if labels actually changed
JsonObject labelsObj = doc["labels"].as<JsonObject>();
std::map<String, String> newLabels;
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
newLabels[key] = String(value);
}
// Compare with existing labels
if (newLabels != respondingNode->labels) {
labelsChanged = true;
respondingNode->labels = newLabels;
}
}
respondingNode->lastSeen = now;
respondingNode->status = NodeInfo::ACTIVE;
// Update latency if we calculated it (preserve existing value if not)
if (latency > 0) {
respondingNode->latency = latency;
}
// Check if any fields changed that require broadcasting
bool nodeInfoChanged = hostnameChanged || labelsChanged;
if (nodeInfoChanged) {
// Fire cluster/node/update event to trigger broadcast
ctx.fire("cluster/node/update", nullptr);
}
LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() +
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
} else {
auto respondingMember = ctx.memberList->getMember(respondingIPStr.c_str());
if (!respondingMember) {
LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString());
return;
}
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
unsigned long latency = 0;
unsigned long now = millis();
if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window
latency = now - lastHeartbeatSentAt;
lastHeartbeatSentAt = 0; // Reset for next calculation
}
// Create updated node info
NodeInfo updatedNode = *respondingMember;
bool hostnameChanged = false;
bool labelsChanged = false;
// Update hostname if provided
if (doc["hostname"].is<const char*>()) {
String newHostname = doc["hostname"].as<const char*>();
if (updatedNode.hostname != newHostname) {
updatedNode.hostname = newHostname;
hostnameChanged = true;
}
}
// Update uptime if provided
if (doc["uptime"].is<unsigned long>()) {
updatedNode.uptime = doc["uptime"];
}
// Update labels if provided
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
std::map<String, String> newLabels;
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
newLabels[key] = String(value);
}
// Compare with existing labels
if (newLabels != updatedNode.labels) {
labelsChanged = true;
updatedNode.labels = newLabels;
}
}
// Update timing and status
updatedNode.lastSeen = now;
updatedNode.status = NodeInfo::ACTIVE;
// Update latency if we calculated it (preserve existing value if not)
if (latency > 0) {
updatedNode.latency = latency;
}
// Persist the updated node info to the memberlist
ctx.memberList->updateMember(respondingIPStr.c_str(), updatedNode);
// Check if any fields changed that require broadcasting
bool nodeInfoChanged = hostnameChanged || labelsChanged;
if (nodeInfoChanged) {
// Fire cluster/node/update event to trigger broadcast
ctx.fire("cluster/node/update", nullptr);
}
LOG_DEBUG("Cluster", String("Updated responding node ") + updatedNode.hostname + " @ " + respondingNodeIP.toString() +
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
}
void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) {
JsonDocument doc;
// Get our node info for the response (we're the responding node)
auto& memberList = *ctx.memberList;
auto it = memberList.find(ctx.hostname);
if (it != memberList.end()) {
const NodeInfo& node = it->second;
String localIPStr = ctx.localIP.toString();
auto member = ctx.memberList->getMember(localIPStr.c_str());
if (member) {
const NodeInfo& node = *member;
// Response contains info about ourselves (the responding node)
doc["hostname"] = node.hostname;
@@ -265,7 +267,7 @@ void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress&
}
}
} else {
// Fallback to basic info
// Fallback to basic info if not in memberlist
doc["hostname"] = ctx.hostname;
doc["ip"] = ctx.localIP.toString();
doc["uptime"] = millis();
@@ -345,20 +347,20 @@ void ClusterManager::onRawMessage(const char* msg) {
}
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
auto& memberList = *ctx.memberList;
bool memberlistChanged = false;
String ipStr = nodeIP.toString();
// O(1) lookup instead of O(n) search
auto it = memberList.find(nodeHost);
if (it != memberList.end()) {
// Check if member exists
auto existingMember = ctx.memberList->getMember(ipStr.c_str());
if (existingMember) {
// Update existing node - preserve all existing field values
if (it->second.ip != nodeIP) {
it->second.ip = nodeIP;
NodeInfo updatedNode = *existingMember;
if (updatedNode.ip != nodeIP) {
updatedNode.ip = nodeIP;
memberlistChanged = true;
}
it->second.lastSeen = millis();
// Note: Other fields like latency, uptime, labels, etc. are preserved
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
updatedNode.lastSeen = millis();
ctx.memberList->updateMember(ipStr.c_str(), updatedNode);
} else {
// Add new node
NodeInfo newNode;
@@ -366,10 +368,19 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
newNode.ip = nodeIP;
newNode.lastSeen = millis();
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
memberList[nodeHost] = newNode;
// Initialize static resources if this is the local node being added for the first time
if (nodeIP == ctx.localIP && nodeHost == ctx.hostname) {
newNode.resources.chipId = ESP.getChipId();
newNode.resources.sdkVersion = String(ESP.getSdkVersion());
newNode.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
newNode.resources.flashChipSize = ESP.getFlashChipSize();
LOG_DEBUG("Cluster", "Initialized static resources for local node");
}
ctx.memberList->addMember(ipStr.c_str(), newNode);
memberlistChanged = true;
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
}
// Fire event if memberlist changed
@@ -378,124 +389,15 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
}
}
// unused http client to fetch complete node info
void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
if(ip == ctx.localIP) {
LOG_DEBUG("Cluster", "Skipping fetch for local node");
return;
}
unsigned long requestStart = millis();
HTTPClient http;
WiFiClient client;
String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS;
// Use RAII pattern to ensure http.end() is always called
bool httpInitialized = false;
bool success = false;
httpInitialized = http.begin(client, url);
if (!httpInitialized) {
LOG_ERROR("Cluster", "Failed to initialize HTTP client for " + ip.toString());
return;
}
// Set timeout to prevent hanging
http.setTimeout(5000); // 5 second timeout
int httpCode = http.GET();
unsigned long requestEnd = millis();
unsigned long requestDuration = requestEnd - requestStart;
if (httpCode == 200) {
String payload = http.getString();
// Use stack-allocated JsonDocument with proper cleanup
JsonDocument doc;
DeserializationError err = deserializeJson(doc, payload);
if (!err) {
auto& memberList = *ctx.memberList;
// Still need to iterate since we're searching by IP, not hostname
for (auto& pair : memberList) {
NodeInfo& node = pair.second;
if (node.ip == ip) {
// Update resources efficiently
node.resources.freeHeap = doc["freeHeap"];
node.resources.chipId = doc["chipId"];
node.resources.sdkVersion = (const char*)doc["sdkVersion"];
node.resources.cpuFreqMHz = doc["cpuFreqMHz"];
node.resources.flashChipSize = doc["flashChipSize"];
node.status = NodeInfo::ACTIVE;
node.latency = requestDuration;
node.lastSeen = millis();
// Clear and rebuild endpoints efficiently
node.endpoints.clear();
node.endpoints.reserve(10); // Pre-allocate to avoid reallocations
if (doc["api"].is<JsonArray>()) {
JsonArray apiArr = doc["api"].as<JsonArray>();
for (JsonObject apiObj : apiArr) {
// Use const char* to avoid String copies
const char* uri = apiObj["uri"];
int method = apiObj["method"];
// Create basic EndpointInfo without params for cluster nodes
EndpointInfo endpoint;
endpoint.uri = uri; // String assignment is more efficient than construction
endpoint.method = method;
endpoint.isLocal = false;
endpoint.serviceName = "remote";
node.endpoints.push_back(std::move(endpoint));
}
}
// Parse labels efficiently
node.labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) {
// Use const char* to avoid String copies
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
node.labels[key] = value;
}
}
LOG_DEBUG("Cluster", "Fetched info for node: " + node.hostname + " @ " + ip.toString());
success = true;
break;
}
}
} else {
LOG_ERROR("Cluster", "JSON parse error for node @ " + ip.toString() + ": " + String(err.c_str()));
}
} else {
LOG_ERROR("Cluster", "Failed to fetch info for node @ " + ip.toString() + ", HTTP code: " + String(httpCode));
}
// Always ensure HTTP client is properly closed
if (httpInitialized) {
http.end();
}
// Log success/failure for debugging
if (!success) {
LOG_DEBUG("Cluster", "Failed to update node info for " + ip.toString());
}
}
void ClusterManager::heartbeatTaskCallback() {
auto& memberList = *ctx.memberList;
auto it = memberList.find(ctx.hostname);
if (it != memberList.end()) {
NodeInfo& node = it->second;
node.lastSeen = millis();
node.status = NodeInfo::ACTIVE;
node.uptime = millis(); // Update uptime
updateLocalNodeResources();
addOrUpdateNode(ctx.hostname, ctx.localIP);
// Update local node resources and lastSeen since we're actively sending heartbeats
String localIPStr = ctx.localIP.toString();
auto member = ctx.memberList->getMember(localIPStr.c_str());
if (member) {
NodeInfo node = *member;
updateLocalNodeResources(node);
node.lastSeen = millis(); // Update lastSeen since we're actively participating
ctx.memberList->updateMember(localIPStr.c_str(), node);
}
// Broadcast heartbeat - peers will respond with NODE_UPDATE
@@ -515,13 +417,13 @@ void ClusterManager::updateAllMembersInfoTaskCallback() {
void ClusterManager::broadcastNodeUpdate() {
// Broadcast our current node info as NODE_UPDATE to all cluster members
auto& memberList = *ctx.memberList;
auto it = memberList.find(ctx.hostname);
if (it == memberList.end()) {
String localIPStr = ctx.localIP.toString();
auto member = ctx.memberList->getMember(localIPStr.c_str());
if (!member) {
return;
}
const NodeInfo& node = it->second;
const NodeInfo& node = *member;
JsonDocument doc;
doc["hostname"] = node.hostname;
@@ -548,67 +450,44 @@ void ClusterManager::broadcastNodeUpdate() {
}
void ClusterManager::updateAllNodeStatuses() {
auto& memberList = *ctx.memberList;
unsigned long now = millis();
for (auto& pair : memberList) {
NodeInfo& node = pair.second;
updateNodeStatus(node, now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
}
ctx.memberList->updateAllNodeStatuses(now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
}
void ClusterManager::removeDeadNodes() {
auto& memberList = *ctx.memberList;
unsigned long now = millis();
bool memberlistChanged = false;
// Use iterator to safely remove elements from map
for (auto it = memberList.begin(); it != memberList.end(); ) {
unsigned long diff = now - it->second.lastSeen;
if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) {
LOG_INFO("Cluster", "Removing node: " + it->second.hostname);
it = memberList.erase(it);
memberlistChanged = true;
} else {
++it;
}
}
// Fire event if memberlist changed
if (memberlistChanged) {
size_t removedCount = ctx.memberList->removeDeadMembers();
if (removedCount > 0) {
LOG_INFO("Cluster", String("Removed ") + removedCount + " dead nodes");
ctx.fire("cluster/memberlist/changed", nullptr);
}
}
void ClusterManager::printMemberList() {
auto& memberList = *ctx.memberList;
if (memberList.empty()) {
size_t count = ctx.memberList->getMemberCount();
if (count == 0) {
LOG_INFO("Cluster", "Member List: empty");
return;
}
LOG_INFO("Cluster", "Member List:");
for (const auto& pair : memberList) {
const NodeInfo& node = pair.second;
ctx.memberList->forEachMember([](const std::string& ip, const NodeInfo& node) {
LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen));
}
});
}
void ClusterManager::updateLocalNodeResources() {
auto& memberList = *ctx.memberList;
auto it = memberList.find(ctx.hostname);
if (it != memberList.end()) {
NodeInfo& node = it->second;
uint32_t freeHeap = ESP.getFreeHeap();
node.resources.freeHeap = freeHeap;
node.resources.chipId = ESP.getChipId();
node.resources.sdkVersion = String(ESP.getSdkVersion());
node.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
node.resources.flashChipSize = ESP.getFlashChipSize();
// Log memory warnings if heap is getting low
if (freeHeap < ctx.config.low_memory_threshold_bytes) {
LOG_WARN("Cluster", "Low memory warning: " + String(freeHeap) + " bytes free");
} else if (freeHeap < ctx.config.critical_memory_threshold_bytes) {
LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free");
}
void ClusterManager::updateLocalNodeResources(NodeInfo& node) {
// Update node status and timing
node.lastSeen = millis();
node.status = NodeInfo::ACTIVE;
node.uptime = millis();
// Update dynamic resources (always updated)
uint32_t freeHeap = ESP.getFreeHeap();
node.resources.freeHeap = freeHeap;
// Log memory warnings if heap is getting low
if (freeHeap < ctx.config.low_memory_threshold_bytes) {
LOG_WARN("Cluster", "Low memory warning: " + String(freeHeap) + " bytes free");
} else if (freeHeap < ctx.config.critical_memory_threshold_bytes) {
LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free");
}
}

View File

@@ -0,0 +1,114 @@
#include "spore/core/Memberlist.h"
#include <algorithm>
Memberlist::Memberlist() = default;
Memberlist::~Memberlist() = default;
bool Memberlist::addOrUpdateMember(const std::string& ip, const NodeInfo& node) {
auto it = m_members.find(ip);
if (it != m_members.end()) {
// Update existing member
it->second = node;
it->second.lastSeen = millis(); // Update last seen time
return true;
} else {
// Add new member
NodeInfo newNode = node;
newNode.lastSeen = millis();
m_members[ip] = newNode;
return true;
}
}
bool Memberlist::addMember(const std::string& ip, const NodeInfo& node) {
if (m_members.find(ip) != m_members.end()) {
return false; // Member already exists
}
NodeInfo newNode = node;
newNode.lastSeen = millis();
m_members[ip] = newNode;
return true;
}
bool Memberlist::updateMember(const std::string& ip, const NodeInfo& node) {
auto it = m_members.find(ip);
if (it == m_members.end()) {
return false; // Member doesn't exist
}
it->second = node;
it->second.lastSeen = millis(); // Update last seen time
return true;
}
bool Memberlist::removeMember(const std::string& ip) {
auto it = m_members.find(ip);
if (it == m_members.end()) {
return false; // Member doesn't exist
}
m_members.erase(it);
return true;
}
std::optional<NodeInfo> Memberlist::getMember(const std::string& ip) const {
auto it = m_members.find(ip);
if (it != m_members.end()) {
return it->second;
}
return std::nullopt;
}
void Memberlist::forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const {
for (const auto& pair : m_members) {
callback(pair.first, pair.second);
}
}
bool Memberlist::forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const {
for (const auto& pair : m_members) {
if (!callback(pair.first, pair.second)) {
return false;
}
}
return true;
}
size_t Memberlist::getMemberCount() const {
return m_members.size();
}
void Memberlist::updateAllNodeStatuses(unsigned long currentTime,
unsigned long staleThresholdMs,
unsigned long deadThresholdMs,
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange) {
for (auto& [ip, node] : m_members) {
NodeInfo::Status oldStatus = node.status;
updateNodeStatus(node, currentTime, staleThresholdMs, deadThresholdMs);
if (oldStatus != node.status && onStatusChange) {
onStatusChange(ip, oldStatus, node.status);
}
}
}
size_t Memberlist::removeDeadMembers() {
size_t removedCount = 0;
auto it = m_members.begin();
while (it != m_members.end()) {
if (it->second.status == NodeInfo::Status::DEAD) {
it = m_members.erase(it);
++removedCount;
} else {
++it;
}
}
return removedCount;
}
bool Memberlist::hasMember(const std::string& ip) const {
return m_members.find(ip) != m_members.end();
}
void Memberlist::clear() {
m_members.clear();
}

View File

@@ -119,13 +119,8 @@ void NetworkManager::setupWiFi() {
ctx.self.status = NodeInfo::ACTIVE;
// Ensure member list has an entry for this node
auto &memberList = *ctx.memberList;
auto existing = memberList.find(ctx.hostname);
if (existing == memberList.end()) {
memberList[ctx.hostname] = ctx.self;
} else {
existing->second = ctx.self;
}
String localIPStr = ctx.localIP.toString();
ctx.memberList->addOrUpdateMember(localIPStr.c_str(), ctx.self);
// Notify listeners that the node is (re)discovered
ctx.fire("node/discovered", &ctx.self);

View File

@@ -2,7 +2,7 @@
NodeContext::NodeContext() {
udp = new WiFiUDP();
memberList = new std::map<String, NodeInfo>();
memberList = std::make_unique<Memberlist>();
hostname = "";
self.hostname = "";
self.ip = IPAddress();
@@ -19,7 +19,7 @@ NodeContext::NodeContext(std::initializer_list<std::pair<String, String>> initia
NodeContext::~NodeContext() {
delete udp;
delete memberList;
// memberList is a unique_ptr, so no need to delete manually
}
void NodeContext::on(const std::string& event, EventCallback cb) {

View File

@@ -40,8 +40,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
JsonDocument doc;
JsonArray arr = doc["members"].to<JsonArray>();
for (const auto& pair : *ctx.memberList) {
const NodeInfo& node = pair.second;
ctx.memberList->forEachMember([&arr](const std::string& ip, const NodeInfo& node) {
JsonObject obj = arr.add<JsonObject>();
obj["hostname"] = node.hostname;
obj["ip"] = node.ip.toString();
@@ -56,7 +55,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
}
});
String json;
serializeJson(doc, json);

View File

@@ -25,17 +25,12 @@ MonitoringService::SystemResources MonitoringService::getSystemResources() const
// CPU information
resources.currentCpuUsage = cpuUsage.getCpuUsage();
resources.averageCpuUsage = cpuUsage.getAverageCpuUsage();
resources.maxCpuUsage = cpuUsage.getMaxCpuUsage();
resources.minCpuUsage = cpuUsage.getMinCpuUsage();
resources.measurementCount = cpuUsage.getMeasurementCount();
resources.isMeasuring = cpuUsage.isMeasuring();
// Memory information - ESP8266 compatible
resources.freeHeap = ESP.getFreeHeap();
resources.totalHeap = 81920; // ESP8266 has ~80KB RAM
resources.minFreeHeap = 0; // Not available on ESP8266
resources.maxAllocHeap = 0; // Not available on ESP8266
resources.heapFragmentation = calculateHeapFragmentation();
// Filesystem information
getFilesystemInfo(resources.totalBytes, resources.usedBytes);
@@ -59,8 +54,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
JsonObject cpu = doc["cpu"].to<JsonObject>();
cpu["current_usage"] = resources.currentCpuUsage;
cpu["average_usage"] = resources.averageCpuUsage;
cpu["max_usage"] = resources.maxCpuUsage;
cpu["min_usage"] = resources.minCpuUsage;
cpu["measurement_count"] = resources.measurementCount;
cpu["is_measuring"] = resources.isMeasuring;
@@ -68,9 +61,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
JsonObject memory = doc["memory"].to<JsonObject>();
memory["free_heap"] = resources.freeHeap;
memory["total_heap"] = resources.totalHeap;
memory["min_free_heap"] = resources.minFreeHeap;
memory["max_alloc_heap"] = resources.maxAllocHeap;
memory["heap_fragmentation"] = resources.heapFragmentation;
memory["heap_usage_percent"] = resources.totalHeap > 0 ?
(float)(resources.totalHeap - resources.freeHeap) / (float)resources.totalHeap * 100.0f : 0.0f;
@@ -94,15 +84,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
request->send(200, "application/json", json);
}
size_t MonitoringService::calculateHeapFragmentation() const {
size_t freeHeap = ESP.getFreeHeap();
size_t maxAllocHeap = 0; // Not available on ESP8266
if (maxAllocHeap == 0) return 0;
// Calculate fragmentation as percentage of free heap that can't be allocated in one block
return (freeHeap - maxAllocHeap) * 100 / freeHeap;
}
void MonitoringService::getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const {
totalBytes = 0;

View File

@@ -74,20 +74,18 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
doc["flashChipSize"] = ESP.getFlashChipSize();
// Include local node labels if present
if (ctx.memberList) {
auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : it->second.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
} else if (!ctx.self.labels.empty()) {
auto member = ctx.memberList->getMember(ctx.hostname.c_str());
if (member) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : member->labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
} else if (!ctx.self.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : ctx.self.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
}
String json;
serializeJson(doc, json);
@@ -116,7 +114,7 @@ void NodeService::handleUpdateUpload(AsyncWebServerRequest* request, const Strin
LOG_ERROR("OTA", "Update failed: not enough space");
Update.printError(Serial);
AsyncWebServerResponse* response = request->beginResponse(500, "application/json",
"{\"status\": \"FAIL\"}");
"{\"status\": \"FAIL\", \"message\": \"Update failed: not enough space\"}");
response->addHeader("Connection", "close");
request->send(response);
return;
@@ -216,17 +214,17 @@ void NodeService::handleConfigRequest(AsyncWebServerRequest* request) {
// Rebuild self.labels from constructor + config labels
ctx.rebuildLabels();
// TODO think of a better way to update the member list entry for the local node
// Update the member list entry for this node if it exists
if (ctx.memberList) {
auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
// Update the labels in the member list entry
it->second.labels.clear();
for (const auto& kv : ctx.self.labels) {
it->second.labels[kv.first] = kv.second;
}
// Update the member list entry for the local node if it exists
String localIPStr = ctx.localIP.toString();
auto member = ctx.memberList->getMember(localIPStr.c_str());
if (member) {
// Update the labels in the member list entry
NodeInfo updatedNode = *member;
updatedNode.labels.clear();
for (const auto& kv : ctx.self.labels) {
updatedNode.labels[kv.first] = kv.second;
}
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
}
// Save config to file