feature/improved-cluster-forming #15

Merged
master merged 5 commits from feature/improved-cluster-forming into main 2025-10-19 17:47:06 +02:00
Showing only changes of commit b6ad479352 - Show all commits

View File

@@ -158,53 +158,72 @@ void ClusterManager::onNodeUpdate(const char* msg) {
return; return;
} }
// Update the specific node in memberlist // The NODE_UPDATE contains info about the target node (hostname from message)
// but is sent FROM the responding node (ctx.udp->remoteIP())
// We need to find the responding node in the memberlist, not the target node
IPAddress respondingNodeIP = ctx.udp->remoteIP();
auto& memberList = *ctx.memberList; auto& memberList = *ctx.memberList;
auto it = memberList.find(hostname);
if (it != memberList.end()) {
NodeInfo& node = it->second;
// Update basic info if provided // Find the responding node by IP address
NodeInfo* respondingNode = nullptr;
for (auto& pair : memberList) {
if (pair.second.ip == respondingNodeIP) {
respondingNode = &pair.second;
break;
}
}
if (respondingNode) {
// Calculate latency if we recently sent a heartbeat
unsigned long latency = 0;
if (lastHeartbeatSentAt != 0) {
unsigned long now = millis();
latency = now - lastHeartbeatSentAt;
lastHeartbeatSentAt = 0; // Reset for next calculation
}
// Update the responding node's information
if (doc["hostname"].is<const char*>()) { if (doc["hostname"].is<const char*>()) {
node.hostname = doc["hostname"].as<const char*>(); respondingNode->hostname = doc["hostname"].as<const char*>();
} }
if (doc["uptime"].is<unsigned long>()) { if (doc["uptime"].is<unsigned long>()) {
node.uptime = doc["uptime"]; respondingNode->uptime = doc["uptime"];
} }
// Update labels if provided // Update labels if provided
if (doc["labels"].is<JsonObject>()) { if (doc["labels"].is<JsonObject>()) {
node.labels.clear(); respondingNode->labels.clear();
JsonObject labelsObj = doc["labels"].as<JsonObject>(); JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) { for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str(); const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()]; const char* value = labelsObj[kvp.key()];
node.labels[key] = String(value); respondingNode->labels[key] = String(value);
} }
} }
node.lastSeen = millis(); respondingNode->lastSeen = millis();
node.status = NodeInfo::ACTIVE; respondingNode->status = NodeInfo::ACTIVE;
respondingNode->latency = latency;
LOG_DEBUG("Cluster", String("Updated node ") + hostname + " from NODE_UPDATE"); LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + " | latency: " + String(latency) + "ms");
} else { } else {
LOG_WARN("Cluster", String("Received NODE_UPDATE for unknown node: ") + hostname); LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString());
} }
} }
void ClusterManager::sendNodeInfo(const String& hostname, const IPAddress& targetIP) { void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) {
JsonDocument doc; JsonDocument doc;
// Get our node info for the response // Get our node info for the response (we're the responding node)
auto& memberList = *ctx.memberList; auto& memberList = *ctx.memberList;
auto it = memberList.find(ctx.hostname); auto it = memberList.find(ctx.hostname);
if (it != memberList.end()) { if (it != memberList.end()) {
const NodeInfo& node = it->second; const NodeInfo& node = it->second;
// Minimal response: hostname, ip, uptime, labels // Response contains info about ourselves (the responding node)
doc["hostname"] = node.hostname; doc["hostname"] = node.hostname;
doc["ip"] = node.ip.toString(); doc["ip"] = node.ip.toString();
doc["uptime"] = millis() - node.lastSeen; // Approximate uptime doc["uptime"] = node.uptime;
// Add labels if present // Add labels if present
if (!node.labels.empty()) { if (!node.labels.empty()) {
@@ -223,12 +242,13 @@ void ClusterManager::sendNodeInfo(const String& hostname, const IPAddress& targe
String json; String json;
serializeJson(doc, json); serializeJson(doc, json);
// Send NODE_UPDATE:targetHostname:{json about responding node}
ctx.udp->beginPacket(targetIP, ctx.config.udp_port); ctx.udp->beginPacket(targetIP, ctx.config.udp_port);
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + hostname + ":" + json; String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + targetHostname + ":" + json;
ctx.udp->write(msg.c_str()); ctx.udp->write(msg.c_str());
ctx.udp->endPacket(); ctx.udp->endPacket();
LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + hostname + " @ " + targetIP.toString()); LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + targetHostname + " @ " + targetIP.toString());
} }
void ClusterManager::onClusterEvent(const char* msg) { void ClusterManager::onClusterEvent(const char* msg) {