|
|
|
|
@@ -10,15 +10,16 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
|
|
|
|
|
});
|
|
|
|
|
// Register tasks
|
|
|
|
|
registerTasks();
|
|
|
|
|
initMessageHandlers();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ClusterManager::registerTasks() {
|
|
|
|
|
taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
|
|
|
|
taskManager.registerTask("discovery_listen", ctx.config.discovery_interval_ms / 10, [this]() { listenForDiscovery(); });
|
|
|
|
|
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
|
|
|
|
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
|
|
|
|
|
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
|
|
|
|
|
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
|
|
|
|
|
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
|
|
|
|
|
taskManager.registerTask("update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
|
|
|
|
|
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
|
|
|
|
|
LOG_INFO("ClusterManager", "Registered all cluster tasks");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -29,111 +30,146 @@ void ClusterManager::sendDiscovery() {
|
|
|
|
|
ctx.udp->endPacket();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO the various if statements here are a mess, we need to clean them up
|
|
|
|
|
// TODO we should use a state machine to handle the different types of messages
|
|
|
|
|
// TODO we should use a class to handle the different types of messages using predicate functions
|
|
|
|
|
void ClusterManager::listenForDiscovery() {
|
|
|
|
|
void ClusterManager::listen() {
|
|
|
|
|
int packetSize = ctx.udp->parsePacket();
|
|
|
|
|
if (packetSize) {
|
|
|
|
|
char incoming[ClusterProtocol::UDP_BUF_SIZE];
|
|
|
|
|
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
|
|
|
|
|
if (len > 0) {
|
|
|
|
|
incoming[len] = 0;
|
|
|
|
|
if (!packetSize) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char incoming[ClusterProtocol::UDP_BUF_SIZE];
|
|
|
|
|
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
|
|
|
|
|
if (len <= 0) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
incoming[len] = 0;
|
|
|
|
|
handleIncomingMessage(incoming);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ClusterManager::initMessageHandlers() {
|
|
|
|
|
messageHandlers.clear();
|
|
|
|
|
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
|
|
|
|
|
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
|
|
|
|
|
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
|
|
|
|
|
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ClusterManager::handleIncomingMessage(const char* incoming) {
|
|
|
|
|
for (const auto& h : messageHandlers) {
|
|
|
|
|
if (h.predicate(incoming)) {
|
|
|
|
|
h.handle(incoming);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
//LOG_DEBUG(ctx, "UDP", "Packet received: " + String(incoming));
|
|
|
|
|
if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) {
|
|
|
|
|
//LOG_DEBUG(ctx, "UDP", "Discovery request from: " + ctx.udp->remoteIP().toString());
|
|
|
|
|
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
|
|
|
|
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
|
|
|
|
ctx.udp->write(response.c_str());
|
|
|
|
|
ctx.udp->endPacket();
|
|
|
|
|
//LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname);
|
|
|
|
|
} else if (strncmp(incoming, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0) {
|
|
|
|
|
// Reply directly to heartbeat sender with our node info JSON
|
|
|
|
|
// Format: CLUSTER_NODE_INFO:<hostname>:<json>
|
|
|
|
|
JsonDocument doc;
|
|
|
|
|
doc["freeHeap"] = ESP.getFreeHeap();
|
|
|
|
|
doc["chipId"] = ESP.getChipId();
|
|
|
|
|
doc["sdkVersion"] = ESP.getSdkVersion();
|
|
|
|
|
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
|
|
|
|
|
doc["flashChipSize"] = ESP.getFlashChipSize();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Include labels if available
|
|
|
|
|
if (ctx.memberList) {
|
|
|
|
|
auto it = ctx.memberList->find(ctx.hostname);
|
|
|
|
|
if (it != ctx.memberList->end()) {
|
|
|
|
|
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
|
|
|
|
for (const auto& kv : it->second.labels) {
|
|
|
|
|
labelsObj[kv.first.c_str()] = kv.second;
|
|
|
|
|
}
|
|
|
|
|
} else if (!ctx.self.labels.empty()) {
|
|
|
|
|
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
|
|
|
|
for (const auto& kv : ctx.self.labels) {
|
|
|
|
|
labelsObj[kv.first.c_str()] = kv.second;
|
|
|
|
|
bool ClusterManager::isDiscoveryMsg(const char* msg) {
|
|
|
|
|
return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool ClusterManager::isHeartbeatMsg(const char* msg) {
|
|
|
|
|
return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool ClusterManager::isResponseMsg(const char* msg) {
|
|
|
|
|
return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool ClusterManager::isNodeInfoMsg(const char* msg) {
|
|
|
|
|
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ClusterManager::onDiscovery(const char* /*msg*/) {
|
|
|
|
|
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
|
|
|
|
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
|
|
|
|
ctx.udp->write(response.c_str());
|
|
|
|
|
ctx.udp->endPacket();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ClusterManager::onHeartbeat(const char* /*msg*/) {
|
|
|
|
|
JsonDocument doc;
|
|
|
|
|
doc["freeHeap"] = ESP.getFreeHeap();
|
|
|
|
|
doc["chipId"] = ESP.getChipId();
|
|
|
|
|
doc["sdkVersion"] = ESP.getSdkVersion();
|
|
|
|
|
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
|
|
|
|
|
doc["flashChipSize"] = ESP.getFlashChipSize();
|
|
|
|
|
|
|
|
|
|
if (ctx.memberList) {
|
|
|
|
|
auto it = ctx.memberList->find(ctx.hostname);
|
|
|
|
|
if (it != ctx.memberList->end()) {
|
|
|
|
|
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
|
|
|
|
for (const auto& kv : it->second.labels) {
|
|
|
|
|
labelsObj[kv.first.c_str()] = kv.second;
|
|
|
|
|
}
|
|
|
|
|
} else if (!ctx.self.labels.empty()) {
|
|
|
|
|
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
|
|
|
|
for (const auto& kv : ctx.self.labels) {
|
|
|
|
|
labelsObj[kv.first.c_str()] = kv.second;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String json;
|
|
|
|
|
serializeJson(doc, json);
|
|
|
|
|
|
|
|
|
|
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
|
|
|
|
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
|
|
|
|
|
ctx.udp->write(msg.c_str());
|
|
|
|
|
ctx.udp->endPacket();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ClusterManager::onResponse(const char* msg) {
|
|
|
|
|
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
|
|
|
|
String nodeHost = String(hostPtr);
|
|
|
|
|
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ClusterManager::onNodeInfo(const char* msg) {
|
|
|
|
|
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
|
|
|
|
char* hostEnd = strchr(p, ':');
|
|
|
|
|
if (hostEnd) {
|
|
|
|
|
*hostEnd = '\0';
|
|
|
|
|
const char* hostCStr = p;
|
|
|
|
|
const char* jsonCStr = hostEnd + 1;
|
|
|
|
|
|
|
|
|
|
String nodeHost = String(hostCStr);
|
|
|
|
|
IPAddress senderIP = ctx.udp->remoteIP();
|
|
|
|
|
|
|
|
|
|
addOrUpdateNode(nodeHost, senderIP);
|
|
|
|
|
|
|
|
|
|
JsonDocument doc;
|
|
|
|
|
DeserializationError err = deserializeJson(doc, jsonCStr);
|
|
|
|
|
if (!err) {
|
|
|
|
|
auto& memberList = *ctx.memberList;
|
|
|
|
|
auto it = memberList.find(nodeHost);
|
|
|
|
|
if (it != memberList.end()) {
|
|
|
|
|
NodeInfo& node = it->second;
|
|
|
|
|
node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap;
|
|
|
|
|
node.resources.chipId = doc["chipId"] | node.resources.chipId;
|
|
|
|
|
{
|
|
|
|
|
const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str();
|
|
|
|
|
node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion;
|
|
|
|
|
}
|
|
|
|
|
node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz;
|
|
|
|
|
node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize;
|
|
|
|
|
node.status = NodeInfo::ACTIVE;
|
|
|
|
|
unsigned long now = millis();
|
|
|
|
|
node.lastSeen = now;
|
|
|
|
|
if (lastHeartbeatSentAt != 0) {
|
|
|
|
|
node.latency = now - lastHeartbeatSentAt;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
node.labels.clear();
|
|
|
|
|
if (doc["labels"].is<JsonObject>()) {
|
|
|
|
|
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
|
|
|
|
for (JsonPair kvp : labelsObj) {
|
|
|
|
|
const char* key = kvp.key().c_str();
|
|
|
|
|
const char* value = labelsObj[kvp.key()];
|
|
|
|
|
node.labels[key] = value;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String json;
|
|
|
|
|
serializeJson(doc, json);
|
|
|
|
|
|
|
|
|
|
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
|
|
|
|
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
|
|
|
|
|
ctx.udp->write(msg.c_str());
|
|
|
|
|
ctx.udp->endPacket();
|
|
|
|
|
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
|
|
|
|
|
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
|
|
|
|
String nodeHost = String(hostPtr);
|
|
|
|
|
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
|
|
|
|
} else if (strncmp(incoming, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0) {
|
|
|
|
|
// Parse: CLUSTER_NODE_INFO:<hostname>:<json>
|
|
|
|
|
char* p = incoming + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
|
|
|
|
char* hostEnd = strchr(p, ':');
|
|
|
|
|
if (hostEnd) {
|
|
|
|
|
*hostEnd = '\0';
|
|
|
|
|
const char* hostCStr = p;
|
|
|
|
|
const char* jsonCStr = hostEnd + 1;
|
|
|
|
|
|
|
|
|
|
String nodeHost = String(hostCStr);
|
|
|
|
|
IPAddress senderIP = ctx.udp->remoteIP();
|
|
|
|
|
|
|
|
|
|
// Ensure node exists/updated basic info
|
|
|
|
|
addOrUpdateNode(nodeHost, senderIP);
|
|
|
|
|
|
|
|
|
|
// Parse JSON
|
|
|
|
|
JsonDocument doc;
|
|
|
|
|
DeserializationError err = deserializeJson(doc, jsonCStr);
|
|
|
|
|
if (!err) {
|
|
|
|
|
auto& memberList = *ctx.memberList;
|
|
|
|
|
auto it = memberList.find(nodeHost);
|
|
|
|
|
if (it != memberList.end()) {
|
|
|
|
|
NodeInfo& node = it->second;
|
|
|
|
|
node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap;
|
|
|
|
|
node.resources.chipId = doc["chipId"] | node.resources.chipId;
|
|
|
|
|
{
|
|
|
|
|
const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str();
|
|
|
|
|
node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion;
|
|
|
|
|
}
|
|
|
|
|
node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz;
|
|
|
|
|
node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize;
|
|
|
|
|
node.status = NodeInfo::ACTIVE;
|
|
|
|
|
node.lastSeen = millis();
|
|
|
|
|
|
|
|
|
|
// Labels
|
|
|
|
|
node.labels.clear();
|
|
|
|
|
if (doc["labels"].is<JsonObject>()) {
|
|
|
|
|
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
|
|
|
|
for (JsonPair kvp : labelsObj) {
|
|
|
|
|
const char* key = kvp.key().c_str();
|
|
|
|
|
const char* value = labelsObj[kvp.key()];
|
|
|
|
|
node.labels[key] = value;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -156,7 +192,7 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
|
|
|
|
newNode.hostname = nodeHost;
|
|
|
|
|
newNode.ip = nodeIP;
|
|
|
|
|
newNode.lastSeen = millis();
|
|
|
|
|
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
|
|
|
|
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
|
|
|
|
memberList[nodeHost] = newNode;
|
|
|
|
|
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
|
|
|
|
|
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
|
|
|
|
@@ -281,6 +317,7 @@ void ClusterManager::heartbeatTaskCallback() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Broadcast heartbeat so peers can respond with their node info
|
|
|
|
|
lastHeartbeatSentAt = millis();
|
|
|
|
|
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
|
|
|
|
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
|
|
|
|
|
ctx.udp->write(hb.c_str());
|
|
|
|
|
|