feat: memberlist is now a map

This commit is contained in:
2025-08-21 20:35:53 +02:00
parent 175ed8cae8
commit 3124a7f2db
5 changed files with 62 additions and 42 deletions

View File

@@ -6,7 +6,10 @@ void ApiServer::addEndpoint(const String& uri, int method, std::function<void(As
serviceRegistry.push_back(std::make_tuple(uri, method)); serviceRegistry.push_back(std::make_tuple(uri, method));
// Store in NodeInfo for local node // Store in NodeInfo for local node
if (ctx.memberList && !ctx.memberList->empty()) { if (ctx.memberList && !ctx.memberList->empty()) {
(*ctx.memberList)[0].apiEndpoints.push_back(std::make_tuple(uri, method)); auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
it->second.apiEndpoints.push_back(std::make_tuple(uri, method));
}
} }
server.on(uri.c_str(), method, requestHandler); server.on(uri.c_str(), method, requestHandler);
} }
@@ -15,7 +18,10 @@ void ApiServer::addEndpoint(const String& uri, int method, std::function<void(As
std::function<void(AsyncWebServerRequest*, const String&, size_t, uint8_t*, size_t, bool)> uploadHandler) { std::function<void(AsyncWebServerRequest*, const String&, size_t, uint8_t*, size_t, bool)> uploadHandler) {
serviceRegistry.push_back(std::make_tuple(uri, method)); serviceRegistry.push_back(std::make_tuple(uri, method));
if (ctx.memberList && !ctx.memberList->empty()) { if (ctx.memberList && !ctx.memberList->empty()) {
(*ctx.memberList)[0].apiEndpoints.push_back(std::make_tuple(uri, method)); auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
it->second.apiEndpoints.push_back(std::make_tuple(uri, method));
}
} }
server.on(uri.c_str(), method, requestHandler, uploadHandler); server.on(uri.c_str(), method, requestHandler, uploadHandler);
} }
@@ -55,7 +61,8 @@ void ApiServer::onSystemStatusRequest(AsyncWebServerRequest *request) {
void ApiServer::onClusterMembersRequest(AsyncWebServerRequest *request) { void ApiServer::onClusterMembersRequest(AsyncWebServerRequest *request) {
JsonDocument doc; JsonDocument doc;
JsonArray arr = doc["members"].to<JsonArray>(); JsonArray arr = doc["members"].to<JsonArray>();
for (const auto& node : *ctx.memberList) { for (const auto& pair : *ctx.memberList) {
const NodeInfo& node = pair.second;
JsonObject obj = arr.add<JsonObject>(); JsonObject obj = arr.add<JsonObject>();
obj["hostname"] = node.hostname; obj["hostname"] = node.hostname;
obj["ip"] = node.ip.toString(); obj["ip"] = node.ip.toString();

View File

@@ -41,19 +41,24 @@ void ClusterManager::listenForDiscovery() {
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
auto& memberList = *ctx.memberList; auto& memberList = *ctx.memberList;
for (auto& node : memberList) {
if (node.hostname == nodeHost) { // O(1) lookup instead of O(n) search
node.ip = nodeIP; auto it = memberList.find(nodeHost);
if (it != memberList.end()) {
// Update existing node
it->second.ip = nodeIP;
it->second.lastSeen = millis();
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
return; return;
} }
}
// Add new node
NodeInfo newNode; NodeInfo newNode;
newNode.hostname = nodeHost; newNode.hostname = nodeHost;
newNode.ip = nodeIP; newNode.ip = nodeIP;
newNode.lastSeen = millis(); newNode.lastSeen = millis();
updateNodeStatus(newNode, newNode.lastSeen); updateNodeStatus(newNode, newNode.lastSeen);
memberList.push_back(newNode); memberList[nodeHost] = newNode;
Serial.printf("[Cluster] Added node: %s @ %s | Status: %s | last update: 0\n", Serial.printf("[Cluster] Added node: %s @ %s | Status: %s | last update: 0\n",
nodeHost.c_str(), nodeHost.c_str(),
newNode.ip.toString().c_str(), newNode.ip.toString().c_str(),
@@ -77,7 +82,9 @@ void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
DeserializationError err = deserializeJson(doc, payload); DeserializationError err = deserializeJson(doc, payload);
if (!err) { if (!err) {
auto& memberList = *ctx.memberList; auto& memberList = *ctx.memberList;
for (auto& node : memberList) { // Still need to iterate since we're searching by IP, not hostname
for (auto& pair : memberList) {
NodeInfo& node = pair.second;
if (node.ip == ip) { if (node.ip == ip) {
node.resources.freeHeap = doc["freeHeap"]; node.resources.freeHeap = doc["freeHeap"];
node.resources.chipId = doc["chipId"]; node.resources.chipId = doc["chipId"];
@@ -96,6 +103,7 @@ void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
} }
} }
Serial.printf("[Cluster] Fetched info for node: %s @ %s\n", node.hostname.c_str(), ip.toString().c_str()); Serial.printf("[Cluster] Fetched info for node: %s @ %s\n", node.hostname.c_str(), ip.toString().c_str());
break;
} }
} }
} }
@@ -106,20 +114,21 @@ void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
} }
void ClusterManager::heartbeatTaskCallback() { void ClusterManager::heartbeatTaskCallback() {
for (auto& node : *ctx.memberList) { auto& memberList = *ctx.memberList;
if (node.hostname == ctx.hostname) { auto it = memberList.find(ctx.hostname);
if (it != memberList.end()) {
NodeInfo& node = it->second;
node.lastSeen = millis(); node.lastSeen = millis();
node.status = NodeInfo::ACTIVE; node.status = NodeInfo::ACTIVE;
updateLocalNodeResources(); updateLocalNodeResources();
ctx.fire("node_discovered", &node); ctx.fire("node_discovered", &node);
break;
}
} }
} }
void ClusterManager::updateAllMembersInfoTaskCallback() { void ClusterManager::updateAllMembersInfoTaskCallback() {
auto& memberList = *ctx.memberList; auto& memberList = *ctx.memberList;
for (const auto& node : memberList) { for (auto& pair : memberList) {
const NodeInfo& node = pair.second;
if (node.ip != ctx.localIP) { if (node.ip != ctx.localIP) {
fetchNodeInfo(node.ip); fetchNodeInfo(node.ip);
} }
@@ -129,7 +138,8 @@ void ClusterManager::updateAllMembersInfoTaskCallback() {
void ClusterManager::updateAllNodeStatuses() { void ClusterManager::updateAllNodeStatuses() {
auto& memberList = *ctx.memberList; auto& memberList = *ctx.memberList;
unsigned long now = millis(); unsigned long now = millis();
for (auto& node : memberList) { for (auto& pair : memberList) {
NodeInfo& node = pair.second;
updateNodeStatus(node, now); updateNodeStatus(node, now);
node.latency = now - node.lastSeen; node.latency = now - node.lastSeen;
} }
@@ -138,13 +148,15 @@ void ClusterManager::updateAllNodeStatuses() {
void ClusterManager::removeDeadNodes() { void ClusterManager::removeDeadNodes() {
auto& memberList = *ctx.memberList; auto& memberList = *ctx.memberList;
unsigned long now = millis(); unsigned long now = millis();
for (size_t i = 0; i < memberList.size(); ) {
unsigned long diff = now - memberList[i].lastSeen; // Use iterator to safely remove elements from map
if (memberList[i].status == NodeInfo::DEAD && diff > NODE_DEAD_THRESHOLD) { for (auto it = memberList.begin(); it != memberList.end(); ) {
Serial.printf("[Cluster] Removing node: %s\n", memberList[i].hostname.c_str()); unsigned long diff = now - it->second.lastSeen;
memberList.erase(memberList.begin() + i); if (it->second.status == NodeInfo::DEAD && diff > NODE_DEAD_THRESHOLD) {
Serial.printf("[Cluster] Removing node: %s\n", it->second.hostname.c_str());
it = memberList.erase(it);
} else { } else {
++i; ++it;
} }
} }
} }
@@ -156,20 +168,21 @@ void ClusterManager::printMemberList() {
return; return;
} }
Serial.println("[Cluster] Member List:"); Serial.println("[Cluster] Member List:");
for (const auto& node : memberList) { for (const auto& pair : memberList) {
const NodeInfo& node = pair.second;
Serial.printf(" %s @ %s | Status: %s | last seen: %lu\n", node.hostname.c_str(), node.ip.toString().c_str(), statusToStr(node.status), millis() - node.lastSeen); Serial.printf(" %s @ %s | Status: %s | last seen: %lu\n", node.hostname.c_str(), node.ip.toString().c_str(), statusToStr(node.status), millis() - node.lastSeen);
} }
} }
void ClusterManager::updateLocalNodeResources() { void ClusterManager::updateLocalNodeResources() {
for (auto& node : *ctx.memberList) { auto& memberList = *ctx.memberList;
if (node.hostname == ctx.hostname) { auto it = memberList.find(ctx.hostname);
if (it != memberList.end()) {
NodeInfo& node = it->second;
node.resources.freeHeap = ESP.getFreeHeap(); node.resources.freeHeap = ESP.getFreeHeap();
node.resources.chipId = ESP.getChipId(); node.resources.chipId = ESP.getChipId();
node.resources.sdkVersion = String(ESP.getSdkVersion()); node.resources.sdkVersion = String(ESP.getSdkVersion());
node.resources.cpuFreqMHz = ESP.getCpuFreqMHz(); node.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
node.resources.flashChipSize = ESP.getFlashChipSize(); node.resources.flashChipSize = ESP.getFlashChipSize();
break;
}
} }
} }

View File

@@ -6,6 +6,7 @@
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
#include <ArduinoJson.h> #include <ArduinoJson.h>
#include <ESP8266HTTPClient.h> #include <ESP8266HTTPClient.h>
#include <map>
class ClusterManager { class ClusterManager {
public: public:
@@ -16,7 +17,7 @@ public:
void updateAllNodeStatuses(); void updateAllNodeStatuses();
void removeDeadNodes(); void removeDeadNodes();
void printMemberList(); void printMemberList();
const std::vector<NodeInfo>& getMemberList() const { return *ctx.memberList; } const std::map<String, NodeInfo>& getMemberList() const { return *ctx.memberList; }
void fetchNodeInfo(const IPAddress& ip); void fetchNodeInfo(const IPAddress& ip);
void updateLocalNodeResources(); void updateLocalNodeResources();
void heartbeatTaskCallback(); void heartbeatTaskCallback();

View File

@@ -3,7 +3,7 @@
NodeContext::NodeContext() { NodeContext::NodeContext() {
scheduler = new Scheduler(); scheduler = new Scheduler();
udp = new WiFiUDP(); udp = new WiFiUDP();
memberList = new std::vector<NodeInfo>(); memberList = new std::map<String, NodeInfo>();
hostname = ""; hostname = "";
} }

View File

@@ -1,10 +1,9 @@
#pragma once #pragma once
#include <TaskSchedulerDeclarations.h> #include <TaskSchedulerDeclarations.h>
#include <WiFiUdp.h> #include <WiFiUdp.h>
#include <vector> #include <map>
#include "NodeInfo.h" #include "NodeInfo.h"
#include <functional> #include <functional>
#include <map>
#include <string> #include <string>
class NodeContext { class NodeContext {
@@ -15,7 +14,7 @@ public:
WiFiUDP* udp; WiFiUDP* udp;
String hostname; String hostname;
IPAddress localIP; IPAddress localIP;
std::vector<NodeInfo>* memberList; std::map<String, NodeInfo>* memberList;
using EventCallback = std::function<void(void*)>; using EventCallback = std::function<void(void*)>;
std::map<std::string, std::vector<EventCallback>> eventRegistry; std::map<std::string, std::vector<EventCallback>> eventRegistry;