Compare commits

..

4 Commits

Author SHA1 Message Date
eaeb9bbea8 config: more frequent cluster_listen 2025-09-25 22:07:22 +02:00
096cf12704 feat: measure latency 2025-09-25 21:54:25 +02:00
356ec3d381 feat: simplify udp listen 2025-09-25 20:44:31 +02:00
51bd7bd909 feat: introduce udp state machine 2025-09-24 21:23:00 +02:00
7 changed files with 170 additions and 112 deletions

View File

@@ -74,7 +74,7 @@ The system runs several background tasks at different intervals:
| Task | Interval (default) | Purpose | | Task | Interval (default) | Purpose |
|------|--------------------|---------| |------|--------------------|---------|
| `discovery_send` | 1000 ms | Send UDP discovery packets | | `discovery_send` | 1000 ms | Send UDP discovery packets |
| `discovery_listen` | 100 ms | Listen for discovery/heartbeat/node-info | | `cluster_listen` | 100 ms | Listen for discovery/heartbeat/node-info |
| `status_update` | 1000 ms | Update node status categories, purge dead | | `status_update` | 1000 ms | Update node status categories, purge dead |
| `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources | | `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources |
| `update_members_info` | 10000 ms | Reserved; no-op (info via UDP) | | `update_members_info` | 10000 ms | Reserved; no-op (info via UDP) |

View File

@@ -319,18 +319,18 @@ curl -X POST http://192.168.1.100/api/tasks/control \
### Before (with wrapper functions): ### Before (with wrapper functions):
```cpp ```cpp
void discoverySendTask() { cluster.sendDiscovery(); } void discoverySendTask() { cluster.sendDiscovery(); }
void discoveryListenTask() { cluster.listenForDiscovery(); } void clusterListenTask() { cluster.listen(); }
taskManager.registerTask("discovery_send", interval, discoverySendTask); taskManager.registerTask("discovery_send", interval, discoverySendTask);
taskManager.registerTask("discovery_listen", interval, discoveryListenTask); taskManager.registerTask("cluster_listen", interval, clusterListenTask);
``` ```
### After (with std::bind): ### After (with std::bind):
```cpp ```cpp
taskManager.registerTask("discovery_send", interval, taskManager.registerTask("discovery_send", interval,
std::bind(&ClusterManager::sendDiscovery, &cluster)); std::bind(&ClusterManager::sendDiscovery, &cluster));
taskManager.registerTask("discovery_listen", interval, taskManager.registerTask("cluster_listen", interval,
std::bind(&ClusterManager::listenForDiscovery, &cluster)); std::bind(&ClusterManager::listen, &cluster));
``` ```
## Compatibility ## Compatibility

View File

@@ -7,13 +7,15 @@
#include <ArduinoJson.h> #include <ArduinoJson.h>
#include <ESP8266HTTPClient.h> #include <ESP8266HTTPClient.h>
#include <map> #include <map>
#include <vector>
#include <functional>
class ClusterManager { class ClusterManager {
public: public:
ClusterManager(NodeContext& ctx, TaskManager& taskMgr); ClusterManager(NodeContext& ctx, TaskManager& taskMgr);
void registerTasks(); void registerTasks();
void sendDiscovery(); void sendDiscovery();
void listenForDiscovery(); void listen();
void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP); void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP);
void updateAllNodeStatuses(); void updateAllNodeStatuses();
void removeDeadNodes(); void removeDeadNodes();
@@ -26,4 +28,21 @@ public:
private: private:
NodeContext& ctx; NodeContext& ctx;
TaskManager& taskManager; TaskManager& taskManager;
struct MessageHandler {
bool (*predicate)(const char*);
std::function<void(const char*)> handle;
const char* name;
};
void initMessageHandlers();
void handleIncomingMessage(const char* incoming);
static bool isDiscoveryMsg(const char* msg);
static bool isHeartbeatMsg(const char* msg);
static bool isResponseMsg(const char* msg);
static bool isNodeInfoMsg(const char* msg);
void onDiscovery(const char* msg);
void onHeartbeat(const char* msg);
void onResponse(const char* msg);
void onNodeInfo(const char* msg);
unsigned long lastHeartbeatSentAt = 0;
std::vector<MessageHandler> messageHandlers;
}; };

View File

@@ -15,6 +15,7 @@ public:
// Cluster Configuration // Cluster Configuration
unsigned long discovery_interval_ms; unsigned long discovery_interval_ms;
unsigned long heartbeat_interval_ms; unsigned long heartbeat_interval_ms;
unsigned long cluster_listen_interval_ms;
unsigned long status_update_interval_ms; unsigned long status_update_interval_ms;
unsigned long member_info_update_interval_ms; unsigned long member_info_update_interval_ms;
unsigned long print_interval_ms; unsigned long print_interval_ms;

View File

@@ -17,7 +17,7 @@ struct NodeInfo {
uint32_t cpuFreqMHz = 0; uint32_t cpuFreqMHz = 0;
uint32_t flashChipSize = 0; uint32_t flashChipSize = 0;
} resources; } resources;
unsigned long latency = 0; // ms since lastSeen unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt
std::vector<EndpointInfo> endpoints; // List of registered endpoints std::vector<EndpointInfo> endpoints; // List of registered endpoints
std::map<String, String> labels; // Arbitrary node labels (key -> value) std::map<String, String> labels; // Arbitrary node labels (key -> value)
}; };

View File

@@ -10,15 +10,16 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
}); });
// Register tasks // Register tasks
registerTasks(); registerTasks();
initMessageHandlers();
} }
void ClusterManager::registerTasks() { void ClusterManager::registerTasks() {
taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
taskManager.registerTask("discovery_listen", ctx.config.discovery_interval_ms / 10, [this]() { listenForDiscovery(); }); taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
taskManager.registerTask("update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); }); taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
LOG_INFO("ClusterManager", "Registered all cluster tasks"); LOG_INFO("ClusterManager", "Registered all cluster tasks");
} }
@@ -29,111 +30,146 @@ void ClusterManager::sendDiscovery() {
ctx.udp->endPacket(); ctx.udp->endPacket();
} }
// TODO the various if statements here are a mess, we need to clean them up void ClusterManager::listen() {
// TODO we should use a state machine to handle the different types of messages
// TODO we should use a class to handle the different types of messages using predicate functions
void ClusterManager::listenForDiscovery() {
int packetSize = ctx.udp->parsePacket(); int packetSize = ctx.udp->parsePacket();
if (packetSize) { if (!packetSize) {
char incoming[ClusterProtocol::UDP_BUF_SIZE]; return;
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); }
if (len > 0) {
incoming[len] = 0; char incoming[ClusterProtocol::UDP_BUF_SIZE];
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
if (len <= 0) {
return;
}
incoming[len] = 0;
handleIncomingMessage(incoming);
}
void ClusterManager::initMessageHandlers() {
messageHandlers.clear();
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
}
void ClusterManager::handleIncomingMessage(const char* incoming) {
for (const auto& h : messageHandlers) {
if (h.predicate(incoming)) {
h.handle(incoming);
return;
} }
//LOG_DEBUG(ctx, "UDP", "Packet received: " + String(incoming)); }
if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) { }
//LOG_DEBUG(ctx, "UDP", "Discovery request from: " + ctx.udp->remoteIP().toString());
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
ctx.udp->write(response.c_str());
ctx.udp->endPacket();
//LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname);
} else if (strncmp(incoming, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0) {
// Reply directly to heartbeat sender with our node info JSON
// Format: CLUSTER_NODE_INFO:<hostname>:<json>
JsonDocument doc;
doc["freeHeap"] = ESP.getFreeHeap();
doc["chipId"] = ESP.getChipId();
doc["sdkVersion"] = ESP.getSdkVersion();
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
doc["flashChipSize"] = ESP.getFlashChipSize();
// Include labels if available bool ClusterManager::isDiscoveryMsg(const char* msg) {
if (ctx.memberList) { return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
auto it = ctx.memberList->find(ctx.hostname); }
if (it != ctx.memberList->end()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>(); bool ClusterManager::isHeartbeatMsg(const char* msg) {
for (const auto& kv : it->second.labels) { return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
labelsObj[kv.first.c_str()] = kv.second; }
}
} else if (!ctx.self.labels.empty()) { bool ClusterManager::isResponseMsg(const char* msg) {
JsonObject labelsObj = doc["labels"].to<JsonObject>(); return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0;
for (const auto& kv : ctx.self.labels) { }
labelsObj[kv.first.c_str()] = kv.second;
bool ClusterManager::isNodeInfoMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
}
void ClusterManager::onDiscovery(const char* /*msg*/) {
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
ctx.udp->write(response.c_str());
ctx.udp->endPacket();
}
void ClusterManager::onHeartbeat(const char* /*msg*/) {
JsonDocument doc;
doc["freeHeap"] = ESP.getFreeHeap();
doc["chipId"] = ESP.getChipId();
doc["sdkVersion"] = ESP.getSdkVersion();
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
doc["flashChipSize"] = ESP.getFlashChipSize();
if (ctx.memberList) {
auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : it->second.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
} else if (!ctx.self.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : ctx.self.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
}
String json;
serializeJson(doc, json);
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
ctx.udp->write(msg.c_str());
ctx.udp->endPacket();
}
void ClusterManager::onResponse(const char* msg) {
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
String nodeHost = String(hostPtr);
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
}
void ClusterManager::onNodeInfo(const char* msg) {
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
char* hostEnd = strchr(p, ':');
if (hostEnd) {
*hostEnd = '\0';
const char* hostCStr = p;
const char* jsonCStr = hostEnd + 1;
String nodeHost = String(hostCStr);
IPAddress senderIP = ctx.udp->remoteIP();
addOrUpdateNode(nodeHost, senderIP);
JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonCStr);
if (!err) {
auto& memberList = *ctx.memberList;
auto it = memberList.find(nodeHost);
if (it != memberList.end()) {
NodeInfo& node = it->second;
node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap;
node.resources.chipId = doc["chipId"] | node.resources.chipId;
{
const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str();
node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion;
}
node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz;
node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize;
node.status = NodeInfo::ACTIVE;
unsigned long now = millis();
node.lastSeen = now;
if (lastHeartbeatSentAt != 0) {
node.latency = now - lastHeartbeatSentAt;
}
node.labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
node.labels[key] = value;
} }
} }
} }
} else {
String json; LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
serializeJson(doc, json);
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
ctx.udp->write(msg.c_str());
ctx.udp->endPacket();
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
String nodeHost = String(hostPtr);
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
} else if (strncmp(incoming, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0) {
// Parse: CLUSTER_NODE_INFO:<hostname>:<json>
char* p = incoming + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
char* hostEnd = strchr(p, ':');
if (hostEnd) {
*hostEnd = '\0';
const char* hostCStr = p;
const char* jsonCStr = hostEnd + 1;
String nodeHost = String(hostCStr);
IPAddress senderIP = ctx.udp->remoteIP();
// Ensure node exists/updated basic info
addOrUpdateNode(nodeHost, senderIP);
// Parse JSON
JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonCStr);
if (!err) {
auto& memberList = *ctx.memberList;
auto it = memberList.find(nodeHost);
if (it != memberList.end()) {
NodeInfo& node = it->second;
node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap;
node.resources.chipId = doc["chipId"] | node.resources.chipId;
{
const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str();
node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion;
}
node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz;
node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize;
node.status = NodeInfo::ACTIVE;
node.lastSeen = millis();
// Labels
node.labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
node.labels[key] = value;
}
}
}
} else {
LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
}
}
} }
} }
} }
@@ -156,7 +192,7 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
newNode.hostname = nodeHost; newNode.hostname = nodeHost;
newNode.ip = nodeIP; newNode.ip = nodeIP;
newNode.lastSeen = millis(); newNode.lastSeen = millis();
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
memberList[nodeHost] = newNode; memberList[nodeHost] = newNode;
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
@@ -281,6 +317,7 @@ void ClusterManager::heartbeatTaskCallback() {
} }
// Broadcast heartbeat so peers can respond with their node info // Broadcast heartbeat so peers can respond with their node info
lastHeartbeatSentAt = millis();
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname; String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
ctx.udp->write(hb.c_str()); ctx.udp->write(hb.c_str());

View File

@@ -10,10 +10,11 @@ Config::Config() {
api_server_port = 80; api_server_port = 80;
// Cluster Configuration // Cluster Configuration
discovery_interval_ms = 1000; discovery_interval_ms = 1000; // TODO retire this in favor of heartbeat_interval_ms
cluster_listen_interval_ms = 10;
heartbeat_interval_ms = 5000; heartbeat_interval_ms = 5000;
status_update_interval_ms = 1000; status_update_interval_ms = 1000;
member_info_update_interval_ms = 10000; member_info_update_interval_ms = 10000; // TODO retire this in favor of heartbeat_interval_ms
print_interval_ms = 5000; print_interval_ms = 5000;
// Node Status Thresholds // Node Status Thresholds