feat: memberlist optimization
This commit is contained in:
@@ -19,7 +19,7 @@ public:
|
||||
void updateAllNodeStatuses();
|
||||
void removeDeadNodes();
|
||||
void printMemberList();
|
||||
const std::map<String, NodeInfo>& getMemberList() const { return *ctx.memberList; }
|
||||
size_t getMemberCount() const { return ctx.memberList->getMemberCount(); }
|
||||
void updateLocalNodeResources(NodeInfo& node);
|
||||
void heartbeatTaskCallback();
|
||||
void updateAllMembersInfoTaskCallback();
|
||||
|
||||
133
include/spore/core/Memberlist.h
Normal file
133
include/spore/core/Memberlist.h
Normal file
@@ -0,0 +1,133 @@
|
||||
#pragma once
|
||||
|
||||
#include <Arduino.h>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <optional>
|
||||
#include <functional>
|
||||
#include "spore/types/NodeInfo.h"
|
||||
|
||||
/**
|
||||
* @brief Manages the list of cluster members.
|
||||
*
|
||||
* The Memberlist class maintains a collection of cluster members, where each member
|
||||
* is identified by its IP address and associated with a NodeInfo object. It provides
|
||||
* methods to add, update, and remove members, as well as handle node status changes
|
||||
* (stale and dead nodes).
|
||||
*/
|
||||
class Memberlist {
|
||||
public:
|
||||
/**
|
||||
* @brief Default constructor.
|
||||
*/
|
||||
Memberlist();
|
||||
|
||||
/**
|
||||
* @brief Destructor.
|
||||
*/
|
||||
~Memberlist();
|
||||
|
||||
/**
|
||||
* @brief Adds or updates a member in the list.
|
||||
*
|
||||
* If the member already exists, updates its information. Otherwise, adds a new member.
|
||||
* @param ip The IP address of the member (as string).
|
||||
* @param node The NodeInfo object containing member details.
|
||||
* @return True if the member was added or updated, false otherwise.
|
||||
*/
|
||||
bool addOrUpdateMember(const std::string& ip, const NodeInfo& node);
|
||||
|
||||
/**
|
||||
* @brief Adds a new member to the list.
|
||||
*
|
||||
* @param ip The IP address of the member (as string).
|
||||
* @param node The NodeInfo object containing member details.
|
||||
* @return True if the member was added, false if it already exists.
|
||||
*/
|
||||
bool addMember(const std::string& ip, const NodeInfo& node);
|
||||
|
||||
/**
|
||||
* @brief Updates an existing member in the list.
|
||||
*
|
||||
* @param ip The IP address of the member (as string).
|
||||
* @param node The updated NodeInfo object.
|
||||
* @return True if the member was updated, false if it doesn't exist.
|
||||
*/
|
||||
bool updateMember(const std::string& ip, const NodeInfo& node);
|
||||
|
||||
/**
|
||||
* @brief Removes a member from the list.
|
||||
*
|
||||
* @param ip The IP address of the member to remove (as string).
|
||||
* @return True if the member was removed, false if it doesn't exist.
|
||||
*/
|
||||
bool removeMember(const std::string& ip);
|
||||
|
||||
/**
|
||||
* @brief Retrieves a member by IP address.
|
||||
*
|
||||
* @param ip The IP address of the member (as string).
|
||||
* @return Optional containing the NodeInfo if found, or std::nullopt if not found.
|
||||
*/
|
||||
std::optional<NodeInfo> getMember(const std::string& ip) const;
|
||||
|
||||
/**
|
||||
* @brief Iterates over all members and calls the provided callback for each.
|
||||
*
|
||||
* @param callback Function to call for each member. Receives (ip, node) as parameters.
|
||||
*/
|
||||
void forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const;
|
||||
|
||||
/**
|
||||
* @brief Iterates over all members and calls the provided callback for each.
|
||||
*
|
||||
* @param callback Function to call for each member. Receives (ip, node) as parameters.
|
||||
* If callback returns false, iteration stops.
|
||||
* @return True if all members were processed, false if iteration was stopped early.
|
||||
*/
|
||||
bool forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const;
|
||||
|
||||
/**
|
||||
* @brief Gets the number of members in the list.
|
||||
*
|
||||
* @return The number of members.
|
||||
*/
|
||||
size_t getMemberCount() const;
|
||||
|
||||
/**
|
||||
* @brief Updates the status of all members based on current time and thresholds.
|
||||
*
|
||||
* Marks nodes as stale or dead based on their last seen time.
|
||||
* @param currentTime The current time in milliseconds.
|
||||
* @param staleThresholdMs Threshold for marking a node as stale (milliseconds).
|
||||
* @param deadThresholdMs Threshold for marking a node as dead (milliseconds).
|
||||
* @param onStatusChange Optional callback fired when a node's status changes.
|
||||
*/
|
||||
void updateAllNodeStatuses(unsigned long currentTime,
|
||||
unsigned long staleThresholdMs,
|
||||
unsigned long deadThresholdMs,
|
||||
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange = nullptr);
|
||||
|
||||
/**
|
||||
* @brief Removes all dead members from the list.
|
||||
*
|
||||
* @return The number of members removed.
|
||||
*/
|
||||
size_t removeDeadMembers();
|
||||
|
||||
/**
|
||||
* @brief Checks if a member exists in the list.
|
||||
*
|
||||
* @param ip The IP address of the member (as string).
|
||||
* @return True if the member exists, false otherwise.
|
||||
*/
|
||||
bool hasMember(const std::string& ip) const;
|
||||
|
||||
/**
|
||||
* @brief Clears all members from the list.
|
||||
*/
|
||||
void clear();
|
||||
|
||||
private:
|
||||
std::map<std::string, NodeInfo> m_members; ///< Internal map holding the members.
|
||||
};
|
||||
@@ -2,12 +2,14 @@
|
||||
|
||||
#include <WiFiUdp.h>
|
||||
#include <map>
|
||||
#include "spore/types/NodeInfo.h"
|
||||
#include <functional>
|
||||
#include <string>
|
||||
#include <initializer_list>
|
||||
#include <memory>
|
||||
#include "spore/types/NodeInfo.h"
|
||||
#include "spore/types/Config.h"
|
||||
#include "spore/types/ApiTypes.h"
|
||||
#include "spore/core/Memberlist.h"
|
||||
|
||||
class NodeContext {
|
||||
public:
|
||||
@@ -18,7 +20,7 @@ public:
|
||||
String hostname;
|
||||
IPAddress localIP;
|
||||
NodeInfo self;
|
||||
std::map<String, NodeInfo>* memberList;
|
||||
std::unique_ptr<Memberlist> memberList;
|
||||
::Config config;
|
||||
std::map<String, String> constructorLabels; // Labels passed to constructor (not persisted)
|
||||
|
||||
|
||||
@@ -23,11 +23,12 @@ void ApiServer::registerEndpoint(const String& uri, int method,
|
||||
endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
|
||||
|
||||
// Update cluster if needed
|
||||
if (ctx.memberList && !ctx.memberList->empty()) {
|
||||
auto it = ctx.memberList->find(ctx.hostname);
|
||||
if (it != ctx.memberList->end()) {
|
||||
it->second.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
|
||||
}
|
||||
String localIPStr = ctx.localIP.toString();
|
||||
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||
if (member) {
|
||||
NodeInfo updatedNode = *member;
|
||||
updatedNode.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
|
||||
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -165,92 +165,93 @@ void ClusterManager::onNodeUpdate(const char* msg) {
|
||||
// but is sent FROM the responding node (ctx.udp->remoteIP())
|
||||
// We need to find the responding node in the memberlist, not the target node
|
||||
IPAddress respondingNodeIP = ctx.udp->remoteIP();
|
||||
auto& memberList = *ctx.memberList;
|
||||
String respondingIPStr = respondingNodeIP.toString();
|
||||
|
||||
// Find the responding node by IP address
|
||||
NodeInfo* respondingNode = nullptr;
|
||||
for (auto& pair : memberList) {
|
||||
if (pair.second.ip == respondingNodeIP) {
|
||||
respondingNode = &pair.second;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (respondingNode) {
|
||||
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
|
||||
unsigned long latency = 0;
|
||||
unsigned long now = millis();
|
||||
if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window
|
||||
latency = now - lastHeartbeatSentAt;
|
||||
lastHeartbeatSentAt = 0; // Reset for next calculation
|
||||
}
|
||||
|
||||
// Update the responding node's information
|
||||
bool hostnameChanged = false;
|
||||
if (doc["hostname"].is<const char*>()) {
|
||||
String newHostname = doc["hostname"].as<const char*>();
|
||||
if (respondingNode->hostname != newHostname) {
|
||||
respondingNode->hostname = newHostname;
|
||||
hostnameChanged = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (doc["uptime"].is<unsigned long>()) {
|
||||
respondingNode->uptime = doc["uptime"];
|
||||
}
|
||||
|
||||
// Update labels if provided
|
||||
bool labelsChanged = false;
|
||||
if (doc["labels"].is<JsonObject>()) {
|
||||
// Check if labels actually changed
|
||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||
std::map<String, String> newLabels;
|
||||
for (JsonPair kvp : labelsObj) {
|
||||
const char* key = kvp.key().c_str();
|
||||
const char* value = labelsObj[kvp.key()];
|
||||
newLabels[key] = String(value);
|
||||
}
|
||||
|
||||
// Compare with existing labels
|
||||
if (newLabels != respondingNode->labels) {
|
||||
labelsChanged = true;
|
||||
respondingNode->labels = newLabels;
|
||||
}
|
||||
}
|
||||
|
||||
respondingNode->lastSeen = now;
|
||||
respondingNode->status = NodeInfo::ACTIVE;
|
||||
|
||||
// Update latency if we calculated it (preserve existing value if not)
|
||||
if (latency > 0) {
|
||||
respondingNode->latency = latency;
|
||||
}
|
||||
|
||||
// Check if any fields changed that require broadcasting
|
||||
bool nodeInfoChanged = hostnameChanged || labelsChanged;
|
||||
|
||||
if (nodeInfoChanged) {
|
||||
// Fire cluster/node/update event to trigger broadcast
|
||||
ctx.fire("cluster/node/update", nullptr);
|
||||
}
|
||||
|
||||
LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() +
|
||||
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
|
||||
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
|
||||
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
|
||||
} else {
|
||||
auto respondingMember = ctx.memberList->getMember(respondingIPStr.c_str());
|
||||
if (!respondingMember) {
|
||||
LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
|
||||
unsigned long latency = 0;
|
||||
unsigned long now = millis();
|
||||
if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window
|
||||
latency = now - lastHeartbeatSentAt;
|
||||
lastHeartbeatSentAt = 0; // Reset for next calculation
|
||||
}
|
||||
|
||||
// Create updated node info
|
||||
NodeInfo updatedNode = *respondingMember;
|
||||
bool hostnameChanged = false;
|
||||
bool labelsChanged = false;
|
||||
|
||||
// Update hostname if provided
|
||||
if (doc["hostname"].is<const char*>()) {
|
||||
String newHostname = doc["hostname"].as<const char*>();
|
||||
if (updatedNode.hostname != newHostname) {
|
||||
updatedNode.hostname = newHostname;
|
||||
hostnameChanged = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Update uptime if provided
|
||||
if (doc["uptime"].is<unsigned long>()) {
|
||||
updatedNode.uptime = doc["uptime"];
|
||||
}
|
||||
|
||||
// Update labels if provided
|
||||
if (doc["labels"].is<JsonObject>()) {
|
||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||
std::map<String, String> newLabels;
|
||||
for (JsonPair kvp : labelsObj) {
|
||||
const char* key = kvp.key().c_str();
|
||||
const char* value = labelsObj[kvp.key()];
|
||||
newLabels[key] = String(value);
|
||||
}
|
||||
|
||||
// Compare with existing labels
|
||||
if (newLabels != updatedNode.labels) {
|
||||
labelsChanged = true;
|
||||
updatedNode.labels = newLabels;
|
||||
}
|
||||
}
|
||||
|
||||
// Update timing and status
|
||||
updatedNode.lastSeen = now;
|
||||
updatedNode.status = NodeInfo::ACTIVE;
|
||||
|
||||
// Update latency if we calculated it (preserve existing value if not)
|
||||
if (latency > 0) {
|
||||
updatedNode.latency = latency;
|
||||
}
|
||||
|
||||
// Persist the updated node info to the memberlist
|
||||
ctx.memberList->updateMember(respondingIPStr.c_str(), updatedNode);
|
||||
|
||||
// Check if any fields changed that require broadcasting
|
||||
bool nodeInfoChanged = hostnameChanged || labelsChanged;
|
||||
|
||||
if (nodeInfoChanged) {
|
||||
// Fire cluster/node/update event to trigger broadcast
|
||||
ctx.fire("cluster/node/update", nullptr);
|
||||
}
|
||||
|
||||
LOG_DEBUG("Cluster", String("Updated responding node ") + updatedNode.hostname + " @ " + respondingNodeIP.toString() +
|
||||
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
|
||||
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
|
||||
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
|
||||
}
|
||||
|
||||
void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) {
|
||||
JsonDocument doc;
|
||||
|
||||
// Get our node info for the response (we're the responding node)
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(ctx.hostname);
|
||||
if (it != memberList.end()) {
|
||||
const NodeInfo& node = it->second;
|
||||
String localIPStr = ctx.localIP.toString();
|
||||
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||
if (member) {
|
||||
const NodeInfo& node = *member;
|
||||
|
||||
// Response contains info about ourselves (the responding node)
|
||||
doc["hostname"] = node.hostname;
|
||||
@@ -265,7 +266,7 @@ void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress&
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Fallback to basic info
|
||||
// Fallback to basic info if not in memberlist
|
||||
doc["hostname"] = ctx.hostname;
|
||||
doc["ip"] = ctx.localIP.toString();
|
||||
doc["uptime"] = millis();
|
||||
@@ -345,18 +346,20 @@ void ClusterManager::onRawMessage(const char* msg) {
|
||||
}
|
||||
|
||||
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
bool memberlistChanged = false;
|
||||
String ipStr = nodeIP.toString();
|
||||
|
||||
// O(1) lookup instead of O(n) search
|
||||
auto it = memberList.find(nodeHost);
|
||||
if (it != memberList.end()) {
|
||||
// Check if member exists
|
||||
auto existingMember = ctx.memberList->getMember(ipStr.c_str());
|
||||
if (existingMember) {
|
||||
// Update existing node - preserve all existing field values
|
||||
if (it->second.ip != nodeIP) {
|
||||
it->second.ip = nodeIP;
|
||||
NodeInfo updatedNode = *existingMember;
|
||||
if (updatedNode.ip != nodeIP) {
|
||||
updatedNode.ip = nodeIP;
|
||||
memberlistChanged = true;
|
||||
}
|
||||
it->second.lastSeen = millis();
|
||||
updatedNode.lastSeen = millis();
|
||||
ctx.memberList->updateMember(ipStr.c_str(), updatedNode);
|
||||
} else {
|
||||
// Add new node
|
||||
NodeInfo newNode;
|
||||
@@ -364,7 +367,7 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||
newNode.ip = nodeIP;
|
||||
newNode.lastSeen = millis();
|
||||
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
||||
memberList[nodeHost] = newNode;
|
||||
ctx.memberList->addMember(ipStr.c_str(), newNode);
|
||||
memberlistChanged = true;
|
||||
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
|
||||
}
|
||||
@@ -376,12 +379,14 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||
}
|
||||
|
||||
void ClusterManager::heartbeatTaskCallback() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(ctx.hostname);
|
||||
if (it != memberList.end()) {
|
||||
NodeInfo& node = it->second;
|
||||
// Update local node resources and lastSeen since we're actively sending heartbeats
|
||||
String localIPStr = ctx.localIP.toString();
|
||||
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||
if (member) {
|
||||
NodeInfo node = *member;
|
||||
updateLocalNodeResources(node);
|
||||
addOrUpdateNode(ctx.hostname, ctx.localIP);
|
||||
node.lastSeen = millis(); // Update lastSeen since we're actively participating
|
||||
ctx.memberList->updateMember(localIPStr.c_str(), node);
|
||||
}
|
||||
|
||||
// Broadcast heartbeat - peers will respond with NODE_UPDATE
|
||||
@@ -401,13 +406,13 @@ void ClusterManager::updateAllMembersInfoTaskCallback() {
|
||||
|
||||
void ClusterManager::broadcastNodeUpdate() {
|
||||
// Broadcast our current node info as NODE_UPDATE to all cluster members
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(ctx.hostname);
|
||||
if (it == memberList.end()) {
|
||||
String localIPStr = ctx.localIP.toString();
|
||||
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||
if (!member) {
|
||||
return;
|
||||
}
|
||||
|
||||
const NodeInfo& node = it->second;
|
||||
const NodeInfo& node = *member;
|
||||
|
||||
JsonDocument doc;
|
||||
doc["hostname"] = node.hostname;
|
||||
@@ -434,48 +439,28 @@ void ClusterManager::broadcastNodeUpdate() {
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllNodeStatuses() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
unsigned long now = millis();
|
||||
for (auto& pair : memberList) {
|
||||
NodeInfo& node = pair.second;
|
||||
updateNodeStatus(node, now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
||||
}
|
||||
ctx.memberList->updateAllNodeStatuses(now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
||||
}
|
||||
|
||||
void ClusterManager::removeDeadNodes() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
unsigned long now = millis();
|
||||
bool memberlistChanged = false;
|
||||
|
||||
// Use iterator to safely remove elements from map
|
||||
for (auto it = memberList.begin(); it != memberList.end(); ) {
|
||||
unsigned long diff = now - it->second.lastSeen;
|
||||
if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) {
|
||||
LOG_INFO("Cluster", "Removing node: " + it->second.hostname);
|
||||
it = memberList.erase(it);
|
||||
memberlistChanged = true;
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
// Fire event if memberlist changed
|
||||
if (memberlistChanged) {
|
||||
size_t removedCount = ctx.memberList->removeDeadMembers();
|
||||
if (removedCount > 0) {
|
||||
LOG_INFO("Cluster", String("Removed ") + removedCount + " dead nodes");
|
||||
ctx.fire("cluster/memberlist/changed", nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::printMemberList() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
if (memberList.empty()) {
|
||||
size_t count = ctx.memberList->getMemberCount();
|
||||
if (count == 0) {
|
||||
LOG_INFO("Cluster", "Member List: empty");
|
||||
return;
|
||||
}
|
||||
LOG_INFO("Cluster", "Member List:");
|
||||
for (const auto& pair : memberList) {
|
||||
const NodeInfo& node = pair.second;
|
||||
ctx.memberList->forEachMember([](const std::string& ip, const NodeInfo& node) {
|
||||
LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void ClusterManager::updateLocalNodeResources(NodeInfo& node) {
|
||||
|
||||
114
src/spore/core/Memberlist.cpp
Normal file
114
src/spore/core/Memberlist.cpp
Normal file
@@ -0,0 +1,114 @@
|
||||
#include "spore/core/Memberlist.h"
|
||||
#include <algorithm>
|
||||
|
||||
Memberlist::Memberlist() = default;
|
||||
|
||||
Memberlist::~Memberlist() = default;
|
||||
|
||||
bool Memberlist::addOrUpdateMember(const std::string& ip, const NodeInfo& node) {
|
||||
auto it = m_members.find(ip);
|
||||
if (it != m_members.end()) {
|
||||
// Update existing member
|
||||
it->second = node;
|
||||
it->second.lastSeen = millis(); // Update last seen time
|
||||
return true;
|
||||
} else {
|
||||
// Add new member
|
||||
NodeInfo newNode = node;
|
||||
newNode.lastSeen = millis();
|
||||
m_members[ip] = newNode;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool Memberlist::addMember(const std::string& ip, const NodeInfo& node) {
|
||||
if (m_members.find(ip) != m_members.end()) {
|
||||
return false; // Member already exists
|
||||
}
|
||||
NodeInfo newNode = node;
|
||||
newNode.lastSeen = millis();
|
||||
m_members[ip] = newNode;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Memberlist::updateMember(const std::string& ip, const NodeInfo& node) {
|
||||
auto it = m_members.find(ip);
|
||||
if (it == m_members.end()) {
|
||||
return false; // Member doesn't exist
|
||||
}
|
||||
it->second = node;
|
||||
it->second.lastSeen = millis(); // Update last seen time
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Memberlist::removeMember(const std::string& ip) {
|
||||
auto it = m_members.find(ip);
|
||||
if (it == m_members.end()) {
|
||||
return false; // Member doesn't exist
|
||||
}
|
||||
m_members.erase(it);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::optional<NodeInfo> Memberlist::getMember(const std::string& ip) const {
|
||||
auto it = m_members.find(ip);
|
||||
if (it != m_members.end()) {
|
||||
return it->second;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void Memberlist::forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const {
|
||||
for (const auto& pair : m_members) {
|
||||
callback(pair.first, pair.second);
|
||||
}
|
||||
}
|
||||
|
||||
bool Memberlist::forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const {
|
||||
for (const auto& pair : m_members) {
|
||||
if (!callback(pair.first, pair.second)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t Memberlist::getMemberCount() const {
|
||||
return m_members.size();
|
||||
}
|
||||
|
||||
void Memberlist::updateAllNodeStatuses(unsigned long currentTime,
|
||||
unsigned long staleThresholdMs,
|
||||
unsigned long deadThresholdMs,
|
||||
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange) {
|
||||
for (auto& [ip, node] : m_members) {
|
||||
NodeInfo::Status oldStatus = node.status;
|
||||
updateNodeStatus(node, currentTime, staleThresholdMs, deadThresholdMs);
|
||||
|
||||
if (oldStatus != node.status && onStatusChange) {
|
||||
onStatusChange(ip, oldStatus, node.status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t Memberlist::removeDeadMembers() {
|
||||
size_t removedCount = 0;
|
||||
auto it = m_members.begin();
|
||||
while (it != m_members.end()) {
|
||||
if (it->second.status == NodeInfo::Status::DEAD) {
|
||||
it = m_members.erase(it);
|
||||
++removedCount;
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
return removedCount;
|
||||
}
|
||||
|
||||
bool Memberlist::hasMember(const std::string& ip) const {
|
||||
return m_members.find(ip) != m_members.end();
|
||||
}
|
||||
|
||||
void Memberlist::clear() {
|
||||
m_members.clear();
|
||||
}
|
||||
@@ -119,13 +119,8 @@ void NetworkManager::setupWiFi() {
|
||||
ctx.self.status = NodeInfo::ACTIVE;
|
||||
|
||||
// Ensure member list has an entry for this node
|
||||
auto &memberList = *ctx.memberList;
|
||||
auto existing = memberList.find(ctx.hostname);
|
||||
if (existing == memberList.end()) {
|
||||
memberList[ctx.hostname] = ctx.self;
|
||||
} else {
|
||||
existing->second = ctx.self;
|
||||
}
|
||||
String localIPStr = ctx.localIP.toString();
|
||||
ctx.memberList->addOrUpdateMember(localIPStr.c_str(), ctx.self);
|
||||
|
||||
// Notify listeners that the node is (re)discovered
|
||||
ctx.fire("node/discovered", &ctx.self);
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
NodeContext::NodeContext() {
|
||||
udp = new WiFiUDP();
|
||||
memberList = new std::map<String, NodeInfo>();
|
||||
memberList = std::make_unique<Memberlist>();
|
||||
hostname = "";
|
||||
self.hostname = "";
|
||||
self.ip = IPAddress();
|
||||
@@ -19,7 +19,7 @@ NodeContext::NodeContext(std::initializer_list<std::pair<String, String>> initia
|
||||
|
||||
NodeContext::~NodeContext() {
|
||||
delete udp;
|
||||
delete memberList;
|
||||
// memberList is a unique_ptr, so no need to delete manually
|
||||
}
|
||||
|
||||
void NodeContext::on(const std::string& event, EventCallback cb) {
|
||||
|
||||
@@ -40,8 +40,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
|
||||
JsonDocument doc;
|
||||
JsonArray arr = doc["members"].to<JsonArray>();
|
||||
|
||||
for (const auto& pair : *ctx.memberList) {
|
||||
const NodeInfo& node = pair.second;
|
||||
ctx.memberList->forEachMember([&arr](const std::string& ip, const NodeInfo& node) {
|
||||
JsonObject obj = arr.add<JsonObject>();
|
||||
obj["hostname"] = node.hostname;
|
||||
obj["ip"] = node.ip.toString();
|
||||
@@ -56,7 +55,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
String json;
|
||||
serializeJson(doc, json);
|
||||
|
||||
@@ -74,20 +74,18 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
|
||||
doc["flashChipSize"] = ESP.getFlashChipSize();
|
||||
|
||||
// Include local node labels if present
|
||||
if (ctx.memberList) {
|
||||
auto it = ctx.memberList->find(ctx.hostname);
|
||||
if (it != ctx.memberList->end()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : it->second.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
} else if (!ctx.self.labels.empty()) {
|
||||
auto member = ctx.memberList->getMember(ctx.hostname.c_str());
|
||||
if (member) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : member->labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
} else if (!ctx.self.labels.empty()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : ctx.self.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String json;
|
||||
serializeJson(doc, json);
|
||||
@@ -216,17 +214,17 @@ void NodeService::handleConfigRequest(AsyncWebServerRequest* request) {
|
||||
// Rebuild self.labels from constructor + config labels
|
||||
ctx.rebuildLabels();
|
||||
|
||||
// TODO think of a better way to update the member list entry for the local node
|
||||
// Update the member list entry for this node if it exists
|
||||
if (ctx.memberList) {
|
||||
auto it = ctx.memberList->find(ctx.hostname);
|
||||
if (it != ctx.memberList->end()) {
|
||||
// Update the labels in the member list entry
|
||||
it->second.labels.clear();
|
||||
for (const auto& kv : ctx.self.labels) {
|
||||
it->second.labels[kv.first] = kv.second;
|
||||
}
|
||||
// Update the member list entry for the local node if it exists
|
||||
String localIPStr = ctx.localIP.toString();
|
||||
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||
if (member) {
|
||||
// Update the labels in the member list entry
|
||||
NodeInfo updatedNode = *member;
|
||||
updatedNode.labels.clear();
|
||||
for (const auto& kv : ctx.self.labels) {
|
||||
updatedNode.labels[kv.first] = kv.second;
|
||||
}
|
||||
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
|
||||
}
|
||||
|
||||
// Save config to file
|
||||
|
||||
Reference in New Issue
Block a user