feat: improve cluster forming; just use heartbeat to form the cluster
This commit is contained in:
@@ -25,27 +25,27 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
|
||||
this->ctx.udp->write(msg.c_str());
|
||||
this->ctx.udp->endPacket();
|
||||
});
|
||||
|
||||
// Handler for node update broadcasts: services fire 'cluster/node_update' when their node info changes
|
||||
ctx.on("cluster/node_update", [this](void* data) {
|
||||
// Trigger immediate NODE_UPDATE broadcast when node info changes
|
||||
broadcastNodeUpdate();
|
||||
});
|
||||
// Register tasks
|
||||
registerTasks();
|
||||
initMessageHandlers();
|
||||
}
|
||||
|
||||
void ClusterManager::registerTasks() {
|
||||
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
||||
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
|
||||
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
|
||||
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
|
||||
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
|
||||
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
|
||||
taskManager.registerTask("node_update_broadcast", ctx.config.node_update_broadcast_interval_ms, [this]() { broadcastNodeUpdate(); });
|
||||
LOG_INFO("ClusterManager", "Registered all cluster tasks");
|
||||
}
|
||||
|
||||
void ClusterManager::sendDiscovery() {
|
||||
//LOG_DEBUG(ctx, "Cluster", "Sending discovery packet...");
|
||||
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
||||
ctx.udp->write(ClusterProtocol::DISCOVERY_MSG);
|
||||
ctx.udp->endPacket();
|
||||
}
|
||||
// Discovery functionality removed - using heartbeat-only approach
|
||||
|
||||
void ClusterManager::listen() {
|
||||
int packetSize = ctx.udp->parsePacket();
|
||||
@@ -69,10 +69,8 @@ void ClusterManager::listen() {
|
||||
void ClusterManager::initMessageHandlers() {
|
||||
messageHandlers.clear();
|
||||
messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" });
|
||||
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
|
||||
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
|
||||
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
|
||||
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
|
||||
messageHandlers.push_back({ &ClusterManager::isNodeUpdateMsg, [this](const char* msg){ this->onNodeUpdate(msg); }, "NODE_UPDATE" });
|
||||
messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" });
|
||||
}
|
||||
|
||||
@@ -94,20 +92,12 @@ void ClusterManager::handleIncomingMessage(const char* incoming) {
|
||||
LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head);
|
||||
}
|
||||
|
||||
bool ClusterManager::isDiscoveryMsg(const char* msg) {
|
||||
return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isHeartbeatMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isResponseMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isNodeInfoMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
|
||||
bool ClusterManager::isNodeUpdateMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::NODE_UPDATE_MSG, strlen(ClusterProtocol::NODE_UPDATE_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isClusterEventMsg(const char* msg) {
|
||||
@@ -123,87 +113,122 @@ bool ClusterManager::isRawMsg(const char* msg) {
|
||||
return msg[prefixLen] == ':';
|
||||
}
|
||||
|
||||
void ClusterManager::onDiscovery(const char* /*msg*/) {
|
||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
||||
ctx.udp->write(response.c_str());
|
||||
ctx.udp->endPacket();
|
||||
// Discovery functionality removed - using heartbeat-only approach
|
||||
|
||||
void ClusterManager::onHeartbeat(const char* msg) {
|
||||
// Extract hostname from heartbeat message: "CLUSTER_HEARTBEAT:hostname"
|
||||
const char* colon = strchr(msg, ':');
|
||||
if (!colon) {
|
||||
LOG_WARN("Cluster", "Invalid heartbeat message format");
|
||||
return;
|
||||
}
|
||||
|
||||
String hostname = String(colon + 1);
|
||||
IPAddress senderIP = ctx.udp->remoteIP();
|
||||
|
||||
// Update memberlist with the heartbeat
|
||||
addOrUpdateNode(hostname, senderIP);
|
||||
|
||||
// Respond with minimal node info (hostname, ip, uptime, labels)
|
||||
sendNodeInfo(hostname, senderIP);
|
||||
}
|
||||
|
||||
void ClusterManager::onHeartbeat(const char* /*msg*/) {
|
||||
void ClusterManager::onNodeUpdate(const char* msg) {
|
||||
// Message format: "NODE_UPDATE:hostname:{json}"
|
||||
const char* firstColon = strchr(msg, ':');
|
||||
if (!firstColon) {
|
||||
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
|
||||
return;
|
||||
}
|
||||
|
||||
const char* secondColon = strchr(firstColon + 1, ':');
|
||||
if (!secondColon) {
|
||||
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
|
||||
return;
|
||||
}
|
||||
|
||||
String hostnamePart = String(firstColon + 1);
|
||||
String hostname = hostnamePart.substring(0, secondColon - firstColon - 1);
|
||||
const char* jsonCStr = secondColon + 1;
|
||||
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, jsonCStr);
|
||||
if (err) {
|
||||
LOG_WARN("Cluster", String("Failed to parse NODE_UPDATE JSON from ") + ctx.udp->remoteIP().toString());
|
||||
return;
|
||||
}
|
||||
|
||||
// Update the specific node in memberlist
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(hostname);
|
||||
if (it != memberList.end()) {
|
||||
NodeInfo& node = it->second;
|
||||
|
||||
// Update basic info if provided
|
||||
if (doc["hostname"].is<const char*>()) {
|
||||
node.hostname = doc["hostname"].as<const char*>();
|
||||
}
|
||||
if (doc["uptime"].is<unsigned long>()) {
|
||||
node.uptime = doc["uptime"];
|
||||
}
|
||||
|
||||
// Update labels if provided
|
||||
if (doc["labels"].is<JsonObject>()) {
|
||||
node.labels.clear();
|
||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||
for (JsonPair kvp : labelsObj) {
|
||||
const char* key = kvp.key().c_str();
|
||||
const char* value = labelsObj[kvp.key()];
|
||||
node.labels[key] = String(value);
|
||||
}
|
||||
}
|
||||
|
||||
node.lastSeen = millis();
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
|
||||
LOG_DEBUG("Cluster", String("Updated node ") + hostname + " from NODE_UPDATE");
|
||||
} else {
|
||||
LOG_WARN("Cluster", String("Received NODE_UPDATE for unknown node: ") + hostname);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::sendNodeInfo(const String& hostname, const IPAddress& targetIP) {
|
||||
JsonDocument doc;
|
||||
|
||||
if (ctx.memberList) {
|
||||
auto it = ctx.memberList->find(ctx.hostname);
|
||||
if (it != ctx.memberList->end()) {
|
||||
// Get our node info for the response
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(ctx.hostname);
|
||||
if (it != memberList.end()) {
|
||||
const NodeInfo& node = it->second;
|
||||
|
||||
// Minimal response: hostname, ip, uptime, labels
|
||||
doc["hostname"] = node.hostname;
|
||||
doc["ip"] = node.ip.toString();
|
||||
doc["uptime"] = millis() - node.lastSeen; // Approximate uptime
|
||||
|
||||
// Add labels if present
|
||||
if (!node.labels.empty()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : it->second.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
} else if (!ctx.self.labels.empty()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : ctx.self.labels) {
|
||||
for (const auto& kv : node.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Fallback to basic info
|
||||
doc["hostname"] = ctx.hostname;
|
||||
doc["ip"] = ctx.localIP.toString();
|
||||
doc["uptime"] = millis();
|
||||
}
|
||||
|
||||
String json;
|
||||
serializeJson(doc, json);
|
||||
|
||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
|
||||
ctx.udp->beginPacket(targetIP, ctx.config.udp_port);
|
||||
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + hostname + ":" + json;
|
||||
ctx.udp->write(msg.c_str());
|
||||
ctx.udp->endPacket();
|
||||
}
|
||||
|
||||
void ClusterManager::onResponse(const char* msg) {
|
||||
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
||||
String nodeHost = String(hostPtr);
|
||||
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
||||
}
|
||||
|
||||
void ClusterManager::onNodeInfo(const char* msg) {
|
||||
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
||||
char* hostEnd = strchr(p, ':');
|
||||
if (hostEnd) {
|
||||
*hostEnd = '\0';
|
||||
const char* hostCStr = p;
|
||||
const char* jsonCStr = hostEnd + 1;
|
||||
|
||||
String nodeHost = String(hostCStr);
|
||||
IPAddress senderIP = ctx.udp->remoteIP();
|
||||
|
||||
addOrUpdateNode(nodeHost, senderIP);
|
||||
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, jsonCStr);
|
||||
if (!err) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(nodeHost);
|
||||
if (it != memberList.end()) {
|
||||
NodeInfo& node = it->second;
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
unsigned long now = millis();
|
||||
node.lastSeen = now;
|
||||
if (lastHeartbeatSentAt != 0) {
|
||||
node.latency = now - lastHeartbeatSentAt;
|
||||
}
|
||||
|
||||
node.labels.clear();
|
||||
if (doc["labels"].is<JsonObject>()) {
|
||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||
for (JsonPair kvp : labelsObj) {
|
||||
const char* key = kvp.key().c_str();
|
||||
const char* value = labelsObj[kvp.key()];
|
||||
node.labels[key] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + hostname + " @ " + targetIP.toString());
|
||||
}
|
||||
|
||||
void ClusterManager::onClusterEvent(const char* msg) {
|
||||
@@ -406,16 +431,19 @@ void ClusterManager::heartbeatTaskCallback() {
|
||||
NodeInfo& node = it->second;
|
||||
node.lastSeen = millis();
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
node.uptime = millis(); // Update uptime
|
||||
updateLocalNodeResources();
|
||||
addOrUpdateNode(ctx.hostname, ctx.localIP);
|
||||
}
|
||||
|
||||
// Broadcast heartbeat so peers can respond with their node info
|
||||
// Broadcast heartbeat - peers will respond with NODE_UPDATE
|
||||
lastHeartbeatSentAt = millis();
|
||||
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
||||
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
|
||||
ctx.udp->write(hb.c_str());
|
||||
ctx.udp->endPacket();
|
||||
|
||||
LOG_DEBUG("Cluster", String("Sent heartbeat: ") + ctx.hostname);
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllMembersInfoTaskCallback() {
|
||||
@@ -423,6 +451,40 @@ void ClusterManager::updateAllMembersInfoTaskCallback() {
|
||||
// No-op to reduce network and memory usage
|
||||
}
|
||||
|
||||
void ClusterManager::broadcastNodeUpdate() {
|
||||
// Broadcast our current node info as NODE_UPDATE to all cluster members
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(ctx.hostname);
|
||||
if (it == memberList.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const NodeInfo& node = it->second;
|
||||
|
||||
JsonDocument doc;
|
||||
doc["hostname"] = node.hostname;
|
||||
doc["uptime"] = node.uptime;
|
||||
|
||||
// Add labels if present
|
||||
if (!node.labels.empty()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : node.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
}
|
||||
|
||||
String json;
|
||||
serializeJson(doc, json);
|
||||
|
||||
// Broadcast to all cluster members
|
||||
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
||||
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + ctx.hostname + ":" + json;
|
||||
ctx.udp->write(msg.c_str());
|
||||
ctx.udp->endPacket();
|
||||
|
||||
LOG_DEBUG("Cluster", String("Broadcasted NODE_UPDATE for ") + ctx.hostname);
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllNodeStatuses() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
unsigned long now = millis();
|
||||
|
||||
@@ -255,11 +255,10 @@ void NodeService::handleGetConfigRequest(AsyncWebServerRequest* request) {
|
||||
|
||||
// Cluster Configuration
|
||||
JsonObject clusterObj = doc["cluster"].to<JsonObject>();
|
||||
clusterObj["discovery_interval_ms"] = ctx.config.discovery_interval_ms;
|
||||
clusterObj["heartbeat_interval_ms"] = ctx.config.heartbeat_interval_ms;
|
||||
clusterObj["cluster_listen_interval_ms"] = ctx.config.cluster_listen_interval_ms;
|
||||
clusterObj["status_update_interval_ms"] = ctx.config.status_update_interval_ms;
|
||||
clusterObj["member_info_update_interval_ms"] = ctx.config.member_info_update_interval_ms;
|
||||
clusterObj["node_update_broadcast_interval_ms"] = ctx.config.node_update_broadcast_interval_ms;
|
||||
clusterObj["print_interval_ms"] = ctx.config.print_interval_ms;
|
||||
|
||||
// Node Status Thresholds
|
||||
|
||||
@@ -32,11 +32,10 @@ void Config::setDefaults() {
|
||||
api_server_port = DEFAULT_API_SERVER_PORT;
|
||||
|
||||
// Cluster Configuration
|
||||
discovery_interval_ms = DEFAULT_DISCOVERY_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms
|
||||
cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS;
|
||||
heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS;
|
||||
member_info_update_interval_ms = DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms
|
||||
node_update_broadcast_interval_ms = DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS;
|
||||
print_interval_ms = DEFAULT_PRINT_INTERVAL_MS;
|
||||
|
||||
// Node Status Thresholds
|
||||
@@ -86,11 +85,10 @@ bool Config::saveToFile(const String& filename) {
|
||||
doc["network"]["api_server_port"] = api_server_port;
|
||||
|
||||
// Cluster Configuration
|
||||
doc["cluster"]["discovery_interval_ms"] = discovery_interval_ms;
|
||||
doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms;
|
||||
doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms;
|
||||
doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms;
|
||||
doc["cluster"]["member_info_update_interval_ms"] = member_info_update_interval_ms;
|
||||
doc["cluster"]["node_update_broadcast_interval_ms"] = node_update_broadcast_interval_ms;
|
||||
doc["cluster"]["print_interval_ms"] = print_interval_ms;
|
||||
|
||||
// Node Status Thresholds
|
||||
@@ -166,11 +164,10 @@ bool Config::loadFromFile(const String& filename) {
|
||||
api_server_port = doc["network"]["api_server_port"] | DEFAULT_API_SERVER_PORT;
|
||||
|
||||
// Load Cluster Configuration with defaults
|
||||
discovery_interval_ms = doc["cluster"]["discovery_interval_ms"] | DEFAULT_DISCOVERY_INTERVAL_MS;
|
||||
heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS;
|
||||
status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS;
|
||||
member_info_update_interval_ms = doc["cluster"]["member_info_update_interval_ms"] | DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS;
|
||||
node_update_broadcast_interval_ms = doc["cluster"]["node_update_broadcast_interval_ms"] | DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS;
|
||||
print_interval_ms = doc["cluster"]["print_interval_ms"] | DEFAULT_PRINT_INTERVAL_MS;
|
||||
|
||||
// Load Node Status Thresholds with defaults
|
||||
|
||||
Reference in New Issue
Block a user