176 lines
6.6 KiB
C++
176 lines
6.6 KiB
C++
#include "ClusterManager.h"
|
|
|
|
ClusterManager::ClusterManager(NodeContext& ctx) : ctx(ctx) {
|
|
// Register callback for node_discovered event
|
|
ctx.on("node_discovered", [this](void* data) {
|
|
NodeInfo* node = static_cast<NodeInfo*>(data);
|
|
this->addOrUpdateNode(node->hostname, node->ip);
|
|
});
|
|
}
|
|
|
|
void ClusterManager::sendDiscovery() {
|
|
//Serial.println("[Cluster] Sending discovery packet...");
|
|
ctx.udp->beginPacket("255.255.255.255", ClusterProtocol::UDP_PORT);
|
|
ctx.udp->write(ClusterProtocol::DISCOVERY_MSG);
|
|
ctx.udp->endPacket();
|
|
}
|
|
|
|
void ClusterManager::listenForDiscovery() {
|
|
int packetSize = ctx.udp->parsePacket();
|
|
if (packetSize) {
|
|
char incoming[ClusterProtocol::UDP_BUF_SIZE];
|
|
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
|
|
if (len > 0) {
|
|
incoming[len] = 0;
|
|
}
|
|
//Serial.printf("[UDP] Packet received: %s\n", incoming);
|
|
if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) {
|
|
//Serial.printf("[UDP] Discovery request from: %s\n", ctx.udp->remoteIP().toString().c_str());
|
|
ctx.udp->beginPacket(ctx.udp->remoteIP(), ClusterProtocol::UDP_PORT);
|
|
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
|
ctx.udp->write(response.c_str());
|
|
ctx.udp->endPacket();
|
|
//Serial.printf("[UDP] Sent response with hostname: %s\n", ctx.hostname.c_str());
|
|
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
|
|
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
|
String nodeHost = String(hostPtr);
|
|
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
|
auto& memberList = *ctx.memberList;
|
|
for (auto& node : memberList) {
|
|
if (node.hostname == nodeHost) {
|
|
node.ip = nodeIP;
|
|
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
|
return;
|
|
}
|
|
}
|
|
NodeInfo newNode;
|
|
newNode.hostname = nodeHost;
|
|
newNode.ip = nodeIP;
|
|
newNode.lastSeen = millis();
|
|
updateNodeStatus(newNode, newNode.lastSeen);
|
|
memberList.push_back(newNode);
|
|
Serial.printf("[Cluster] Added node: %s @ %s | Status: %s | last update: 0\n",
|
|
nodeHost.c_str(),
|
|
newNode.ip.toString().c_str(),
|
|
statusToStr(newNode.status));
|
|
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
|
}
|
|
|
|
void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
|
|
if(ip == ctx.localIP) {
|
|
Serial.println("[Cluster] Skipping fetch for local node");
|
|
return;
|
|
}
|
|
HTTPClient http;
|
|
WiFiClient client;
|
|
String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS;
|
|
http.begin(client, url);
|
|
int httpCode = http.GET();
|
|
if (httpCode == 200) {
|
|
String payload = http.getString();
|
|
JsonDocument doc;
|
|
DeserializationError err = deserializeJson(doc, payload);
|
|
if (!err) {
|
|
auto& memberList = *ctx.memberList;
|
|
for (auto& node : memberList) {
|
|
if (node.ip == ip) {
|
|
node.resources.freeHeap = doc["freeHeap"];
|
|
node.resources.chipId = doc["chipId"];
|
|
node.resources.sdkVersion = (const char*)doc["sdkVersion"];
|
|
node.resources.cpuFreqMHz = doc["cpuFreqMHz"];
|
|
node.resources.flashChipSize = doc["flashChipSize"];
|
|
node.status = NodeInfo::ACTIVE;
|
|
node.lastSeen = millis();
|
|
node.apiEndpoints.clear();
|
|
if (doc["api"].is<JsonArray>()) {
|
|
JsonArray apiArr = doc["api"].as<JsonArray>();
|
|
for (JsonObject apiObj : apiArr) {
|
|
String uri = (const char*)apiObj["uri"];
|
|
int method = apiObj["method"];
|
|
node.apiEndpoints.push_back(std::make_tuple(uri, method));
|
|
}
|
|
}
|
|
Serial.printf("[Cluster] Fetched info for node: %s @ %s\n", node.hostname.c_str(), ip.toString().c_str());
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
Serial.printf("[Cluster] Failed to fetch info for node @ %s, HTTP code: %d\n", ip.toString().c_str(), httpCode);
|
|
}
|
|
http.end();
|
|
}
|
|
|
|
void ClusterManager::heartbeatTaskCallback() {
|
|
for (auto& node : *ctx.memberList) {
|
|
if (node.hostname == ctx.hostname) {
|
|
node.lastSeen = millis();
|
|
node.status = NodeInfo::ACTIVE;
|
|
updateLocalNodeResources();
|
|
ctx.fire("node_discovered", &node);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClusterManager::updateAllMembersInfoTaskCallback() {
|
|
auto& memberList = *ctx.memberList;
|
|
for (const auto& node : memberList) {
|
|
if (node.ip != ctx.localIP) {
|
|
fetchNodeInfo(node.ip);
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClusterManager::updateAllNodeStatuses() {
|
|
auto& memberList = *ctx.memberList;
|
|
unsigned long now = millis();
|
|
for (auto& node : memberList) {
|
|
updateNodeStatus(node, now);
|
|
node.latency = now - node.lastSeen;
|
|
}
|
|
}
|
|
|
|
void ClusterManager::removeDeadNodes() {
|
|
auto& memberList = *ctx.memberList;
|
|
unsigned long now = millis();
|
|
for (size_t i = 0; i < memberList.size(); ) {
|
|
unsigned long diff = now - memberList[i].lastSeen;
|
|
if (memberList[i].status == NodeInfo::DEAD && diff > NODE_DEAD_THRESHOLD) {
|
|
Serial.printf("[Cluster] Removing node: %s\n", memberList[i].hostname.c_str());
|
|
memberList.erase(memberList.begin() + i);
|
|
} else {
|
|
++i;
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClusterManager::printMemberList() {
|
|
auto& memberList = *ctx.memberList;
|
|
if (memberList.empty()) {
|
|
Serial.println("[Cluster] Member List: empty");
|
|
return;
|
|
}
|
|
Serial.println("[Cluster] Member List:");
|
|
for (const auto& node : memberList) {
|
|
Serial.printf(" %s @ %s | Status: %s | last seen: %lu\n", node.hostname.c_str(), node.ip.toString().c_str(), statusToStr(node.status), millis() - node.lastSeen);
|
|
}
|
|
}
|
|
|
|
void ClusterManager::updateLocalNodeResources() {
|
|
for (auto& node : *ctx.memberList) {
|
|
if (node.hostname == ctx.hostname) {
|
|
node.resources.freeHeap = ESP.getFreeHeap();
|
|
node.resources.chipId = ESP.getChipId();
|
|
node.resources.sdkVersion = String(ESP.getSdkVersion());
|
|
node.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
|
|
node.resources.flashChipSize = ESP.getFlashChipSize();
|
|
break;
|
|
}
|
|
}
|
|
}
|