214 lines
8.9 KiB
C++
214 lines
8.9 KiB
C++
#include "ClusterManager.h"
|
|
|
|
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
|
|
// Register callback for node_discovered event
|
|
ctx.on("node_discovered", [this](void* data) {
|
|
NodeInfo* node = static_cast<NodeInfo*>(data);
|
|
this->addOrUpdateNode(node->hostname, node->ip);
|
|
});
|
|
// Register tasks
|
|
registerTasks();
|
|
}
|
|
|
|
void ClusterManager::registerTasks() {
|
|
taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
|
taskManager.registerTask("discovery_listen", ctx.config.discovery_interval_ms / 10, [this]() { listenForDiscovery(); });
|
|
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
|
|
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
|
|
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
|
|
taskManager.registerTask("update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
|
|
Serial.println("[ClusterManager] Registered all cluster tasks");
|
|
}
|
|
|
|
void ClusterManager::sendDiscovery() {
|
|
//Serial.println("[Cluster] Sending discovery packet...");
|
|
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
|
ctx.udp->write(ClusterProtocol::DISCOVERY_MSG);
|
|
ctx.udp->endPacket();
|
|
}
|
|
|
|
void ClusterManager::listenForDiscovery() {
|
|
int packetSize = ctx.udp->parsePacket();
|
|
if (packetSize) {
|
|
char incoming[ClusterProtocol::UDP_BUF_SIZE];
|
|
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
|
|
if (len > 0) {
|
|
incoming[len] = 0;
|
|
}
|
|
//Serial.printf("[UDP] Packet received: %s\n", incoming);
|
|
if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) {
|
|
//Serial.printf("[UDP] Discovery request from: %s\n", ctx.udp->remoteIP().toString().c_str());
|
|
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
|
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
|
ctx.udp->write(response.c_str());
|
|
ctx.udp->endPacket();
|
|
//Serial.printf("[UDP] Sent response with hostname: %s\n", ctx.hostname.c_str());
|
|
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
|
|
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
|
String nodeHost = String(hostPtr);
|
|
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
|
auto& memberList = *ctx.memberList;
|
|
|
|
// O(1) lookup instead of O(n) search
|
|
auto it = memberList.find(nodeHost);
|
|
if (it != memberList.end()) {
|
|
// Update existing node
|
|
it->second.ip = nodeIP;
|
|
it->second.lastSeen = millis();
|
|
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
|
return;
|
|
}
|
|
|
|
// Add new node
|
|
NodeInfo newNode;
|
|
newNode.hostname = nodeHost;
|
|
newNode.ip = nodeIP;
|
|
newNode.lastSeen = millis();
|
|
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
|
memberList[nodeHost] = newNode;
|
|
Serial.printf("[Cluster] Added node: %s @ %s | Status: %s | last update: 0\n",
|
|
nodeHost.c_str(),
|
|
newNode.ip.toString().c_str(),
|
|
statusToStr(newNode.status));
|
|
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
|
}
|
|
|
|
void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
|
|
if(ip == ctx.localIP) {
|
|
Serial.println("[Cluster] Skipping fetch for local node");
|
|
return;
|
|
}
|
|
unsigned long requestStart = millis();
|
|
HTTPClient http;
|
|
WiFiClient client;
|
|
String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS;
|
|
http.begin(client, url);
|
|
int httpCode = http.GET();
|
|
unsigned long requestEnd = millis();
|
|
unsigned long requestDuration = requestEnd - requestStart;
|
|
if (httpCode == 200) {
|
|
String payload = http.getString();
|
|
JsonDocument doc;
|
|
DeserializationError err = deserializeJson(doc, payload);
|
|
if (!err) {
|
|
auto& memberList = *ctx.memberList;
|
|
// Still need to iterate since we're searching by IP, not hostname
|
|
for (auto& pair : memberList) {
|
|
NodeInfo& node = pair.second;
|
|
if (node.ip == ip) {
|
|
node.resources.freeHeap = doc["freeHeap"];
|
|
node.resources.chipId = doc["chipId"];
|
|
node.resources.sdkVersion = (const char*)doc["sdkVersion"];
|
|
node.resources.cpuFreqMHz = doc["cpuFreqMHz"];
|
|
node.resources.flashChipSize = doc["flashChipSize"];
|
|
node.status = NodeInfo::ACTIVE;
|
|
node.latency = requestDuration;
|
|
node.lastSeen = millis();
|
|
node.apiEndpoints.clear();
|
|
if (doc["api"].is<JsonArray>()) {
|
|
JsonArray apiArr = doc["api"].as<JsonArray>();
|
|
for (JsonObject apiObj : apiArr) {
|
|
String uri = (const char*)apiObj["uri"];
|
|
int method = apiObj["method"];
|
|
node.apiEndpoints.push_back(std::make_tuple(uri, method));
|
|
}
|
|
}
|
|
// Parse labels if present
|
|
node.labels.clear();
|
|
if (doc["labels"].is<JsonObject>()) {
|
|
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
|
for (JsonPair kvp : labelsObj) {
|
|
String k = String(kvp.key().c_str());
|
|
String v = String(labelsObj[kvp.key()]);
|
|
node.labels[k] = v;
|
|
}
|
|
}
|
|
Serial.printf("[Cluster] Fetched info for node: %s @ %s\n", node.hostname.c_str(), ip.toString().c_str());
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
Serial.printf("[Cluster] Failed to fetch info for node @ %s, HTTP code: %d\n", ip.toString().c_str(), httpCode);
|
|
}
|
|
http.end();
|
|
}
|
|
|
|
void ClusterManager::heartbeatTaskCallback() {
|
|
auto& memberList = *ctx.memberList;
|
|
auto it = memberList.find(ctx.hostname);
|
|
if (it != memberList.end()) {
|
|
NodeInfo& node = it->second;
|
|
node.lastSeen = millis();
|
|
node.status = NodeInfo::ACTIVE;
|
|
updateLocalNodeResources();
|
|
ctx.fire("node_discovered", &node);
|
|
}
|
|
}
|
|
|
|
void ClusterManager::updateAllMembersInfoTaskCallback() {
|
|
auto& memberList = *ctx.memberList;
|
|
for (auto& pair : memberList) {
|
|
const NodeInfo& node = pair.second;
|
|
if (node.ip != ctx.localIP) {
|
|
fetchNodeInfo(node.ip);
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClusterManager::updateAllNodeStatuses() {
|
|
auto& memberList = *ctx.memberList;
|
|
unsigned long now = millis();
|
|
for (auto& pair : memberList) {
|
|
NodeInfo& node = pair.second;
|
|
updateNodeStatus(node, now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
|
}
|
|
}
|
|
|
|
void ClusterManager::removeDeadNodes() {
|
|
auto& memberList = *ctx.memberList;
|
|
unsigned long now = millis();
|
|
|
|
// Use iterator to safely remove elements from map
|
|
for (auto it = memberList.begin(); it != memberList.end(); ) {
|
|
unsigned long diff = now - it->second.lastSeen;
|
|
if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) {
|
|
Serial.printf("[Cluster] Removing node: %s\n", it->second.hostname.c_str());
|
|
it = memberList.erase(it);
|
|
} else {
|
|
++it;
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClusterManager::printMemberList() {
|
|
auto& memberList = *ctx.memberList;
|
|
if (memberList.empty()) {
|
|
Serial.println("[Cluster] Member List: empty");
|
|
return;
|
|
}
|
|
Serial.println("[Cluster] Member List:");
|
|
for (const auto& pair : memberList) {
|
|
const NodeInfo& node = pair.second;
|
|
Serial.printf(" %s @ %s | Status: %s | last seen: %lu\n", node.hostname.c_str(), node.ip.toString().c_str(), statusToStr(node.status), millis() - node.lastSeen);
|
|
}
|
|
}
|
|
|
|
void ClusterManager::updateLocalNodeResources() {
|
|
auto& memberList = *ctx.memberList;
|
|
auto it = memberList.find(ctx.hostname);
|
|
if (it != memberList.end()) {
|
|
NodeInfo& node = it->second;
|
|
node.resources.freeHeap = ESP.getFreeHeap();
|
|
node.resources.chipId = ESP.getChipId();
|
|
node.resources.sdkVersion = String(ESP.getSdkVersion());
|
|
node.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
|
|
node.resources.flashChipSize = ESP.getFlashChipSize();
|
|
}
|
|
}
|