diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index a0a53d7..5b084ed 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -158,53 +158,72 @@ void ClusterManager::onNodeUpdate(const char* msg) { return; } - // Update the specific node in memberlist + // The NODE_UPDATE contains info about the target node (hostname from message) + // but is sent FROM the responding node (ctx.udp->remoteIP()) + // We need to find the responding node in the memberlist, not the target node + IPAddress respondingNodeIP = ctx.udp->remoteIP(); auto& memberList = *ctx.memberList; - auto it = memberList.find(hostname); - if (it != memberList.end()) { - NodeInfo& node = it->second; - // Update basic info if provided + // Find the responding node by IP address + NodeInfo* respondingNode = nullptr; + for (auto& pair : memberList) { + if (pair.second.ip == respondingNodeIP) { + respondingNode = &pair.second; + break; + } + } + + if (respondingNode) { + // Calculate latency if we recently sent a heartbeat + unsigned long latency = 0; + if (lastHeartbeatSentAt != 0) { + unsigned long now = millis(); + latency = now - lastHeartbeatSentAt; + lastHeartbeatSentAt = 0; // Reset for next calculation + } + + // Update the responding node's information if (doc["hostname"].is()) { - node.hostname = doc["hostname"].as(); + respondingNode->hostname = doc["hostname"].as(); } if (doc["uptime"].is()) { - node.uptime = doc["uptime"]; + respondingNode->uptime = doc["uptime"]; } // Update labels if provided if (doc["labels"].is()) { - node.labels.clear(); + respondingNode->labels.clear(); JsonObject labelsObj = doc["labels"].as(); for (JsonPair kvp : labelsObj) { const char* key = kvp.key().c_str(); const char* value = labelsObj[kvp.key()]; - node.labels[key] = String(value); + respondingNode->labels[key] = String(value); } } - node.lastSeen = millis(); - node.status = NodeInfo::ACTIVE; + respondingNode->lastSeen = millis(); + respondingNode->status = NodeInfo::ACTIVE; + respondingNode->latency = latency; - LOG_DEBUG("Cluster", String("Updated node ") + hostname + " from NODE_UPDATE"); + LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() + " | latency: " + String(latency) + "ms"); } else { - LOG_WARN("Cluster", String("Received NODE_UPDATE for unknown node: ") + hostname); + LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString()); } } -void ClusterManager::sendNodeInfo(const String& hostname, const IPAddress& targetIP) { +void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) { JsonDocument doc; - // Get our node info for the response + // Get our node info for the response (we're the responding node) auto& memberList = *ctx.memberList; auto it = memberList.find(ctx.hostname); if (it != memberList.end()) { const NodeInfo& node = it->second; - // Minimal response: hostname, ip, uptime, labels + // Response contains info about ourselves (the responding node) doc["hostname"] = node.hostname; doc["ip"] = node.ip.toString(); - doc["uptime"] = millis() - node.lastSeen; // Approximate uptime + doc["uptime"] = node.uptime; // Add labels if present if (!node.labels.empty()) { @@ -223,12 +242,13 @@ void ClusterManager::sendNodeInfo(const String& hostname, const IPAddress& targe String json; serializeJson(doc, json); + // Send NODE_UPDATE:targetHostname:{json about responding node} ctx.udp->beginPacket(targetIP, ctx.config.udp_port); - String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + hostname + ":" + json; + String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + targetHostname + ":" + json; ctx.udp->write(msg.c_str()); ctx.udp->endPacket(); - LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + hostname + " @ " + targetIP.toString()); + LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + targetHostname + " @ " + targetIP.toString()); } void ClusterManager::onClusterEvent(const char* msg) {