feat(cluster): move member info sync to UDP heartbeat; remove HTTP polling
Broadcast CLUSTER_HEARTBEAT every 5s; peers reply with CLUSTER_NODE_INFO containing resources and labels. Update memberList from received node info and set status/lastSeen; keep UDP discovery responses. Disable HTTP-based updateAllMembersInfoTaskCallback loop to reduce network and memory overhead. Add protocol constants HEARTBEAT_MSG and NODE_INFO_MSG; increase UDP buffer to 512 bytes. Set default heartbeat_interval_ms to 5000 ms. Fix sdkVersion JSON fallback by converting to const char* before assigning to String.
This commit is contained in:
@@ -7,8 +7,11 @@
|
||||
namespace ClusterProtocol {
|
||||
constexpr const char* DISCOVERY_MSG = "CLUSTER_DISCOVERY";
|
||||
constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE";
|
||||
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT";
|
||||
constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO";
|
||||
constexpr uint16_t UDP_PORT = 4210;
|
||||
constexpr size_t UDP_BUF_SIZE = 64;
|
||||
// Increased buffer to accommodate node info JSON over UDP
|
||||
constexpr size_t UDP_BUF_SIZE = 512;
|
||||
constexpr const char* API_NODE_STATUS = "/api/node/status";
|
||||
}
|
||||
|
||||
|
||||
@@ -45,10 +45,92 @@ void ClusterManager::listenForDiscovery() {
|
||||
ctx.udp->write(response.c_str());
|
||||
ctx.udp->endPacket();
|
||||
//LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname);
|
||||
} else if (strncmp(incoming, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0) {
|
||||
// Reply directly to heartbeat sender with our node info JSON
|
||||
// Format: CLUSTER_NODE_INFO:<hostname>:<json>
|
||||
JsonDocument doc;
|
||||
doc["freeHeap"] = ESP.getFreeHeap();
|
||||
doc["chipId"] = ESP.getChipId();
|
||||
doc["sdkVersion"] = ESP.getSdkVersion();
|
||||
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
|
||||
doc["flashChipSize"] = ESP.getFlashChipSize();
|
||||
|
||||
// Include labels if available
|
||||
if (ctx.memberList) {
|
||||
auto it = ctx.memberList->find(ctx.hostname);
|
||||
if (it != ctx.memberList->end()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : it->second.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
} else if (!ctx.self.labels.empty()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : ctx.self.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String json;
|
||||
serializeJson(doc, json);
|
||||
|
||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
|
||||
ctx.udp->write(msg.c_str());
|
||||
ctx.udp->endPacket();
|
||||
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
|
||||
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
||||
String nodeHost = String(hostPtr);
|
||||
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
||||
} else if (strncmp(incoming, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0) {
|
||||
// Parse: CLUSTER_NODE_INFO:<hostname>:<json>
|
||||
char* p = incoming + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
||||
char* hostEnd = strchr(p, ':');
|
||||
if (hostEnd) {
|
||||
*hostEnd = '\0';
|
||||
const char* hostCStr = p;
|
||||
const char* jsonCStr = hostEnd + 1;
|
||||
|
||||
String nodeHost = String(hostCStr);
|
||||
IPAddress senderIP = ctx.udp->remoteIP();
|
||||
|
||||
// Ensure node exists/updated basic info
|
||||
addOrUpdateNode(nodeHost, senderIP);
|
||||
|
||||
// Parse JSON
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, jsonCStr);
|
||||
if (!err) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(nodeHost);
|
||||
if (it != memberList.end()) {
|
||||
NodeInfo& node = it->second;
|
||||
node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap;
|
||||
node.resources.chipId = doc["chipId"] | node.resources.chipId;
|
||||
{
|
||||
const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str();
|
||||
node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion;
|
||||
}
|
||||
node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz;
|
||||
node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize;
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
node.lastSeen = millis();
|
||||
|
||||
// Labels
|
||||
node.labels.clear();
|
||||
if (doc["labels"].is<JsonObject>()) {
|
||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||
for (JsonPair kvp : labelsObj) {
|
||||
const char* key = kvp.key().c_str();
|
||||
const char* value = labelsObj[kvp.key()];
|
||||
node.labels[key] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -194,31 +276,17 @@ void ClusterManager::heartbeatTaskCallback() {
|
||||
updateLocalNodeResources();
|
||||
ctx.fire("node_discovered", &node);
|
||||
}
|
||||
|
||||
// Broadcast heartbeat so peers can respond with their node info
|
||||
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
||||
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
|
||||
ctx.udp->write(hb.c_str());
|
||||
ctx.udp->endPacket();
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllMembersInfoTaskCallback() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
|
||||
// Limit concurrent HTTP requests to prevent memory pressure
|
||||
const size_t maxConcurrentRequests = ctx.config.max_concurrent_http_requests;
|
||||
size_t requestCount = 0;
|
||||
|
||||
for (auto& pair : memberList) {
|
||||
const NodeInfo& node = pair.second;
|
||||
if (node.ip != ctx.localIP) {
|
||||
// Only process a limited number of requests per cycle
|
||||
if (requestCount >= maxConcurrentRequests) {
|
||||
LOG_DEBUG("Cluster", "Limiting concurrent HTTP requests to prevent memory pressure");
|
||||
break;
|
||||
}
|
||||
|
||||
fetchNodeInfo(node.ip);
|
||||
requestCount++;
|
||||
|
||||
// Add small delay between requests to prevent overwhelming the system
|
||||
delay(100);
|
||||
}
|
||||
}
|
||||
// HTTP-based member info fetching disabled; node info is provided via UDP responses to heartbeats
|
||||
// No-op to reduce network and memory usage
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllNodeStatuses() {
|
||||
|
||||
@@ -11,7 +11,7 @@ Config::Config() {
|
||||
|
||||
// Cluster Configuration
|
||||
discovery_interval_ms = 1000;
|
||||
heartbeat_interval_ms = 2000;
|
||||
heartbeat_interval_ms = 5000;
|
||||
status_update_interval_ms = 1000;
|
||||
member_info_update_interval_ms = 10000;
|
||||
print_interval_ms = 5000;
|
||||
|
||||
Reference in New Issue
Block a user