basic functionality
This commit is contained in:
175
src/ClusterManager.cpp
Normal file
175
src/ClusterManager.cpp
Normal file
@@ -0,0 +1,175 @@
|
||||
#include "ClusterManager.h"
|
||||
|
||||
ClusterManager::ClusterManager(ClusterContext& ctx) : ctx(ctx) {
|
||||
// Register callback for node_discovered event
|
||||
ctx.registerEvent("node_discovered", [this](void* data) {
|
||||
NodeInfo* node = static_cast<NodeInfo*>(data);
|
||||
this->addOrUpdateNode(node->hostname, node->ip);
|
||||
});
|
||||
}
|
||||
|
||||
void ClusterManager::sendDiscovery() {
|
||||
//Serial.println("[Cluster] Sending discovery packet...");
|
||||
ctx.udp->beginPacket("255.255.255.255", ClusterProtocol::UDP_PORT);
|
||||
ctx.udp->write(ClusterProtocol::DISCOVERY_MSG);
|
||||
ctx.udp->endPacket();
|
||||
}
|
||||
|
||||
void ClusterManager::listenForDiscovery() {
|
||||
int packetSize = ctx.udp->parsePacket();
|
||||
if (packetSize) {
|
||||
char incoming[ClusterProtocol::UDP_BUF_SIZE];
|
||||
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
|
||||
if (len > 0) {
|
||||
incoming[len] = 0;
|
||||
}
|
||||
//Serial.printf("[UDP] Packet received: %s\n", incoming);
|
||||
if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) {
|
||||
//Serial.printf("[UDP] Discovery request from: %s\n", ctx.udp->remoteIP().toString().c_str());
|
||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ClusterProtocol::UDP_PORT);
|
||||
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
||||
ctx.udp->write(response.c_str());
|
||||
ctx.udp->endPacket();
|
||||
//Serial.printf("[UDP] Sent response with hostname: %s\n", ctx.hostname.c_str());
|
||||
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
|
||||
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
||||
String nodeHost = String(hostPtr);
|
||||
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
for (auto& node : memberList) {
|
||||
if (node.hostname == nodeHost) {
|
||||
node.ip = nodeIP;
|
||||
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
||||
return;
|
||||
}
|
||||
}
|
||||
NodeInfo newNode;
|
||||
newNode.hostname = nodeHost;
|
||||
newNode.ip = nodeIP;
|
||||
newNode.lastSeen = millis();
|
||||
updateNodeStatus(newNode, newNode.lastSeen);
|
||||
memberList.push_back(newNode);
|
||||
Serial.printf("[Cluster] Added node: %s @ %s | Status: %s | last update: 0\n",
|
||||
nodeHost.c_str(),
|
||||
newNode.ip.toString().c_str(),
|
||||
statusToStr(newNode.status));
|
||||
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
||||
}
|
||||
|
||||
void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
|
||||
if(ip == ctx.localIP) {
|
||||
Serial.println("[Cluster] Skipping fetch for local node");
|
||||
return;
|
||||
}
|
||||
HTTPClient http;
|
||||
WiFiClient client;
|
||||
String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS;
|
||||
http.begin(client, url);
|
||||
int httpCode = http.GET();
|
||||
if (httpCode == 200) {
|
||||
String payload = http.getString();
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, payload);
|
||||
if (!err) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
for (auto& node : memberList) {
|
||||
if (node.ip == ip) {
|
||||
node.resources.freeHeap = doc["freeHeap"];
|
||||
node.resources.chipId = doc["chipId"];
|
||||
node.resources.sdkVersion = (const char*)doc["sdkVersion"];
|
||||
node.resources.cpuFreqMHz = doc["cpuFreqMHz"];
|
||||
node.resources.flashChipSize = doc["flashChipSize"];
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
node.lastSeen = millis();
|
||||
node.apiEndpoints.clear();
|
||||
if (doc["api"].is<JsonArray>()) {
|
||||
JsonArray apiArr = doc["api"].as<JsonArray>();
|
||||
for (JsonObject apiObj : apiArr) {
|
||||
String uri = (const char*)apiObj["uri"];
|
||||
int method = apiObj["method"];
|
||||
node.apiEndpoints.push_back(std::make_tuple(uri, method));
|
||||
}
|
||||
}
|
||||
Serial.printf("[Cluster] Fetched info for node: %s @ %s\n", node.hostname.c_str(), ip.toString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Serial.printf("[Cluster] Failed to fetch info for node @ %s, HTTP code: %d\n", ip.toString().c_str(), httpCode);
|
||||
}
|
||||
http.end();
|
||||
}
|
||||
|
||||
void ClusterManager::heartbeatTaskCallback() {
|
||||
for (auto& node : *ctx.memberList) {
|
||||
if (node.hostname == ctx.hostname) {
|
||||
node.lastSeen = millis();
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
updateLocalNodeResources();
|
||||
ctx.triggerEvent("node_discovered", &node);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllMembersInfoTaskCallback() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
for (const auto& node : memberList) {
|
||||
if (node.ip != ctx.localIP) {
|
||||
fetchNodeInfo(node.ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllNodeStatuses() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
unsigned long now = millis();
|
||||
for (auto& node : memberList) {
|
||||
updateNodeStatus(node, now);
|
||||
node.latency = now - node.lastSeen;
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::removeDeadNodes() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
unsigned long now = millis();
|
||||
for (size_t i = 0; i < memberList.size(); ) {
|
||||
unsigned long diff = now - memberList[i].lastSeen;
|
||||
if (memberList[i].status == NodeInfo::DEAD && diff > NODE_DEAD_THRESHOLD) {
|
||||
Serial.printf("[Cluster] Removing node: %s\n", memberList[i].hostname.c_str());
|
||||
memberList.erase(memberList.begin() + i);
|
||||
} else {
|
||||
++i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::printMemberList() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
if (memberList.empty()) {
|
||||
Serial.println("[Cluster] Member List: empty");
|
||||
return;
|
||||
}
|
||||
Serial.println("[Cluster] Member List:");
|
||||
for (const auto& node : memberList) {
|
||||
Serial.printf(" %s @ %s | Status: %s | last seen: %lu\n", node.hostname.c_str(), node.ip.toString().c_str(), statusToStr(node.status), millis() - node.lastSeen);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::updateLocalNodeResources() {
|
||||
for (auto& node : *ctx.memberList) {
|
||||
if (node.hostname == ctx.hostname) {
|
||||
node.resources.freeHeap = ESP.getFreeHeap();
|
||||
node.resources.chipId = ESP.getChipId();
|
||||
node.resources.sdkVersion = String(ESP.getSdkVersion());
|
||||
node.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
|
||||
node.resources.flashChipSize = ESP.getFlashChipSize();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user