#include "ClusterManager.h" ClusterManager::ClusterManager(NodeContext& ctx) : ctx(ctx) { // Register callback for node_discovered event ctx.on("node_discovered", [this](void* data) { NodeInfo* node = static_cast(data); this->addOrUpdateNode(node->hostname, node->ip); }); } void ClusterManager::sendDiscovery() { //Serial.println("[Cluster] Sending discovery packet..."); ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); ctx.udp->write(ClusterProtocol::DISCOVERY_MSG); ctx.udp->endPacket(); } void ClusterManager::listenForDiscovery() { int packetSize = ctx.udp->parsePacket(); if (packetSize) { char incoming[ClusterProtocol::UDP_BUF_SIZE]; int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); if (len > 0) { incoming[len] = 0; } //Serial.printf("[UDP] Packet received: %s\n", incoming); if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) { //Serial.printf("[UDP] Discovery request from: %s\n", ctx.udp->remoteIP().toString().c_str()); ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; ctx.udp->write(response.c_str()); ctx.udp->endPacket(); //Serial.printf("[UDP] Sent response with hostname: %s\n", ctx.hostname.c_str()); } else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) { char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1; String nodeHost = String(hostPtr); addOrUpdateNode(nodeHost, ctx.udp->remoteIP()); } } } void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { auto& memberList = *ctx.memberList; // O(1) lookup instead of O(n) search auto it = memberList.find(nodeHost); if (it != memberList.end()) { // Update existing node it->second.ip = nodeIP; it->second.lastSeen = millis(); //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task return; } // Add new node NodeInfo newNode; newNode.hostname = nodeHost; newNode.ip = nodeIP; newNode.lastSeen = millis(); updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); memberList[nodeHost] = newNode; Serial.printf("[Cluster] Added node: %s @ %s | Status: %s | last update: 0\n", nodeHost.c_str(), newNode.ip.toString().c_str(), statusToStr(newNode.status)); //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task } void ClusterManager::fetchNodeInfo(const IPAddress& ip) { if(ip == ctx.localIP) { Serial.println("[Cluster] Skipping fetch for local node"); return; } HTTPClient http; WiFiClient client; String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS; http.begin(client, url); int httpCode = http.GET(); if (httpCode == 200) { String payload = http.getString(); JsonDocument doc; DeserializationError err = deserializeJson(doc, payload); if (!err) { auto& memberList = *ctx.memberList; // Still need to iterate since we're searching by IP, not hostname for (auto& pair : memberList) { NodeInfo& node = pair.second; if (node.ip == ip) { node.resources.freeHeap = doc["freeHeap"]; node.resources.chipId = doc["chipId"]; node.resources.sdkVersion = (const char*)doc["sdkVersion"]; node.resources.cpuFreqMHz = doc["cpuFreqMHz"]; node.resources.flashChipSize = doc["flashChipSize"]; node.status = NodeInfo::ACTIVE; node.lastSeen = millis(); node.apiEndpoints.clear(); if (doc["api"].is()) { JsonArray apiArr = doc["api"].as(); for (JsonObject apiObj : apiArr) { String uri = (const char*)apiObj["uri"]; int method = apiObj["method"]; node.apiEndpoints.push_back(std::make_tuple(uri, method)); } } Serial.printf("[Cluster] Fetched info for node: %s @ %s\n", node.hostname.c_str(), ip.toString().c_str()); break; } } } } else { Serial.printf("[Cluster] Failed to fetch info for node @ %s, HTTP code: %d\n", ip.toString().c_str(), httpCode); } http.end(); } void ClusterManager::heartbeatTaskCallback() { auto& memberList = *ctx.memberList; auto it = memberList.find(ctx.hostname); if (it != memberList.end()) { NodeInfo& node = it->second; node.lastSeen = millis(); node.status = NodeInfo::ACTIVE; updateLocalNodeResources(); ctx.fire("node_discovered", &node); } } void ClusterManager::updateAllMembersInfoTaskCallback() { auto& memberList = *ctx.memberList; for (auto& pair : memberList) { const NodeInfo& node = pair.second; if (node.ip != ctx.localIP) { fetchNodeInfo(node.ip); } } } void ClusterManager::updateAllNodeStatuses() { auto& memberList = *ctx.memberList; unsigned long now = millis(); for (auto& pair : memberList) { NodeInfo& node = pair.second; updateNodeStatus(node, now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); node.latency = now - node.lastSeen; } } void ClusterManager::removeDeadNodes() { auto& memberList = *ctx.memberList; unsigned long now = millis(); // Use iterator to safely remove elements from map for (auto it = memberList.begin(); it != memberList.end(); ) { unsigned long diff = now - it->second.lastSeen; if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) { Serial.printf("[Cluster] Removing node: %s\n", it->second.hostname.c_str()); it = memberList.erase(it); } else { ++it; } } } void ClusterManager::printMemberList() { auto& memberList = *ctx.memberList; if (memberList.empty()) { Serial.println("[Cluster] Member List: empty"); return; } Serial.println("[Cluster] Member List:"); for (const auto& pair : memberList) { const NodeInfo& node = pair.second; Serial.printf(" %s @ %s | Status: %s | last seen: %lu\n", node.hostname.c_str(), node.ip.toString().c_str(), statusToStr(node.status), millis() - node.lastSeen); } } void ClusterManager::updateLocalNodeResources() { auto& memberList = *ctx.memberList; auto it = memberList.find(ctx.hostname); if (it != memberList.end()) { NodeInfo& node = it->second; node.resources.freeHeap = ESP.getFreeHeap(); node.resources.chipId = ESP.getChipId(); node.resources.sdkVersion = String(ESP.getSdkVersion()); node.resources.cpuFreqMHz = ESP.getCpuFreqMHz(); node.resources.flashChipSize = ESP.getFlashChipSize(); } }