diff --git a/include/spore/internal/Globals.h b/include/spore/internal/Globals.h index d7ad5fb..c43d911 100644 --- a/include/spore/internal/Globals.h +++ b/include/spore/internal/Globals.h @@ -7,8 +7,11 @@ namespace ClusterProtocol { constexpr const char* DISCOVERY_MSG = "CLUSTER_DISCOVERY"; constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE"; + constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT"; + constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO"; constexpr uint16_t UDP_PORT = 4210; - constexpr size_t UDP_BUF_SIZE = 64; + // Increased buffer to accommodate node info JSON over UDP + constexpr size_t UDP_BUF_SIZE = 512; constexpr const char* API_NODE_STATUS = "/api/node/status"; } diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 81c1ecc..95e1185 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -45,10 +45,92 @@ void ClusterManager::listenForDiscovery() { ctx.udp->write(response.c_str()); ctx.udp->endPacket(); //LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname); + } else if (strncmp(incoming, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0) { + // Reply directly to heartbeat sender with our node info JSON + // Format: CLUSTER_NODE_INFO:: + JsonDocument doc; + doc["freeHeap"] = ESP.getFreeHeap(); + doc["chipId"] = ESP.getChipId(); + doc["sdkVersion"] = ESP.getSdkVersion(); + doc["cpuFreqMHz"] = ESP.getCpuFreqMHz(); + doc["flashChipSize"] = ESP.getFlashChipSize(); + + // Include labels if available + if (ctx.memberList) { + auto it = ctx.memberList->find(ctx.hostname); + if (it != ctx.memberList->end()) { + JsonObject labelsObj = doc["labels"].to(); + for (const auto& kv : it->second.labels) { + labelsObj[kv.first.c_str()] = kv.second; + } + } else if (!ctx.self.labels.empty()) { + JsonObject labelsObj = doc["labels"].to(); + for (const auto& kv : ctx.self.labels) { + labelsObj[kv.first.c_str()] = kv.second; + } + } + } + + String json; + serializeJson(doc, json); + + ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); + String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json; + ctx.udp->write(msg.c_str()); + ctx.udp->endPacket(); } else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) { char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1; String nodeHost = String(hostPtr); addOrUpdateNode(nodeHost, ctx.udp->remoteIP()); + } else if (strncmp(incoming, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0) { + // Parse: CLUSTER_NODE_INFO:: + char* p = incoming + strlen(ClusterProtocol::NODE_INFO_MSG) + 1; + char* hostEnd = strchr(p, ':'); + if (hostEnd) { + *hostEnd = '\0'; + const char* hostCStr = p; + const char* jsonCStr = hostEnd + 1; + + String nodeHost = String(hostCStr); + IPAddress senderIP = ctx.udp->remoteIP(); + + // Ensure node exists/updated basic info + addOrUpdateNode(nodeHost, senderIP); + + // Parse JSON + JsonDocument doc; + DeserializationError err = deserializeJson(doc, jsonCStr); + if (!err) { + auto& memberList = *ctx.memberList; + auto it = memberList.find(nodeHost); + if (it != memberList.end()) { + NodeInfo& node = it->second; + node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap; + node.resources.chipId = doc["chipId"] | node.resources.chipId; + { + const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str(); + node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion; + } + node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz; + node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize; + node.status = NodeInfo::ACTIVE; + node.lastSeen = millis(); + + // Labels + node.labels.clear(); + if (doc["labels"].is()) { + JsonObject labelsObj = doc["labels"].as(); + for (JsonPair kvp : labelsObj) { + const char* key = kvp.key().c_str(); + const char* value = labelsObj[kvp.key()]; + node.labels[key] = value; + } + } + } + } else { + LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString()); + } + } } } } @@ -194,31 +276,17 @@ void ClusterManager::heartbeatTaskCallback() { updateLocalNodeResources(); ctx.fire("node_discovered", &node); } + + // Broadcast heartbeat so peers can respond with their node info + ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); + String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname; + ctx.udp->write(hb.c_str()); + ctx.udp->endPacket(); } void ClusterManager::updateAllMembersInfoTaskCallback() { - auto& memberList = *ctx.memberList; - - // Limit concurrent HTTP requests to prevent memory pressure - const size_t maxConcurrentRequests = ctx.config.max_concurrent_http_requests; - size_t requestCount = 0; - - for (auto& pair : memberList) { - const NodeInfo& node = pair.second; - if (node.ip != ctx.localIP) { - // Only process a limited number of requests per cycle - if (requestCount >= maxConcurrentRequests) { - LOG_DEBUG("Cluster", "Limiting concurrent HTTP requests to prevent memory pressure"); - break; - } - - fetchNodeInfo(node.ip); - requestCount++; - - // Add small delay between requests to prevent overwhelming the system - delay(100); - } - } + // HTTP-based member info fetching disabled; node info is provided via UDP responses to heartbeats + // No-op to reduce network and memory usage } void ClusterManager::updateAllNodeStatuses() { diff --git a/src/spore/types/Config.cpp b/src/spore/types/Config.cpp index 84f1bff..e36ac3f 100644 --- a/src/spore/types/Config.cpp +++ b/src/spore/types/Config.cpp @@ -11,7 +11,7 @@ Config::Config() { // Cluster Configuration discovery_interval_ms = 1000; - heartbeat_interval_ms = 2000; + heartbeat_interval_ms = 5000; status_update_interval_ms = 1000; member_info_update_interval_ms = 10000; print_interval_ms = 5000;