Compare commits
4 Commits
921eec3848
...
eaeb9bbea8
| Author | SHA1 | Date | |
|---|---|---|---|
| eaeb9bbea8 | |||
| 096cf12704 | |||
| 356ec3d381 | |||
| 51bd7bd909 |
@@ -74,7 +74,7 @@ The system runs several background tasks at different intervals:
|
||||
| Task | Interval (default) | Purpose |
|
||||
|------|--------------------|---------|
|
||||
| `discovery_send` | 1000 ms | Send UDP discovery packets |
|
||||
| `discovery_listen` | 100 ms | Listen for discovery/heartbeat/node-info |
|
||||
| `cluster_listen` | 100 ms | Listen for discovery/heartbeat/node-info |
|
||||
| `status_update` | 1000 ms | Update node status categories, purge dead |
|
||||
| `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources |
|
||||
| `update_members_info` | 10000 ms | Reserved; no-op (info via UDP) |
|
||||
|
||||
@@ -319,18 +319,18 @@ curl -X POST http://192.168.1.100/api/tasks/control \
|
||||
### Before (with wrapper functions):
|
||||
```cpp
|
||||
void discoverySendTask() { cluster.sendDiscovery(); }
|
||||
void discoveryListenTask() { cluster.listenForDiscovery(); }
|
||||
void clusterListenTask() { cluster.listen(); }
|
||||
|
||||
taskManager.registerTask("discovery_send", interval, discoverySendTask);
|
||||
taskManager.registerTask("discovery_listen", interval, discoveryListenTask);
|
||||
taskManager.registerTask("cluster_listen", interval, clusterListenTask);
|
||||
```
|
||||
|
||||
### After (with std::bind):
|
||||
```cpp
|
||||
taskManager.registerTask("discovery_send", interval,
|
||||
std::bind(&ClusterManager::sendDiscovery, &cluster));
|
||||
taskManager.registerTask("discovery_listen", interval,
|
||||
std::bind(&ClusterManager::listenForDiscovery, &cluster));
|
||||
taskManager.registerTask("cluster_listen", interval,
|
||||
std::bind(&ClusterManager::listen, &cluster));
|
||||
```
|
||||
|
||||
## Compatibility
|
||||
|
||||
@@ -7,13 +7,15 @@
|
||||
#include <ArduinoJson.h>
|
||||
#include <ESP8266HTTPClient.h>
|
||||
#include <map>
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
|
||||
class ClusterManager {
|
||||
public:
|
||||
ClusterManager(NodeContext& ctx, TaskManager& taskMgr);
|
||||
void registerTasks();
|
||||
void sendDiscovery();
|
||||
void listenForDiscovery();
|
||||
void listen();
|
||||
void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP);
|
||||
void updateAllNodeStatuses();
|
||||
void removeDeadNodes();
|
||||
@@ -26,4 +28,21 @@ public:
|
||||
private:
|
||||
NodeContext& ctx;
|
||||
TaskManager& taskManager;
|
||||
struct MessageHandler {
|
||||
bool (*predicate)(const char*);
|
||||
std::function<void(const char*)> handle;
|
||||
const char* name;
|
||||
};
|
||||
void initMessageHandlers();
|
||||
void handleIncomingMessage(const char* incoming);
|
||||
static bool isDiscoveryMsg(const char* msg);
|
||||
static bool isHeartbeatMsg(const char* msg);
|
||||
static bool isResponseMsg(const char* msg);
|
||||
static bool isNodeInfoMsg(const char* msg);
|
||||
void onDiscovery(const char* msg);
|
||||
void onHeartbeat(const char* msg);
|
||||
void onResponse(const char* msg);
|
||||
void onNodeInfo(const char* msg);
|
||||
unsigned long lastHeartbeatSentAt = 0;
|
||||
std::vector<MessageHandler> messageHandlers;
|
||||
};
|
||||
|
||||
@@ -15,6 +15,7 @@ public:
|
||||
// Cluster Configuration
|
||||
unsigned long discovery_interval_ms;
|
||||
unsigned long heartbeat_interval_ms;
|
||||
unsigned long cluster_listen_interval_ms;
|
||||
unsigned long status_update_interval_ms;
|
||||
unsigned long member_info_update_interval_ms;
|
||||
unsigned long print_interval_ms;
|
||||
|
||||
@@ -17,7 +17,7 @@ struct NodeInfo {
|
||||
uint32_t cpuFreqMHz = 0;
|
||||
uint32_t flashChipSize = 0;
|
||||
} resources;
|
||||
unsigned long latency = 0; // ms since lastSeen
|
||||
unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt
|
||||
std::vector<EndpointInfo> endpoints; // List of registered endpoints
|
||||
std::map<String, String> labels; // Arbitrary node labels (key -> value)
|
||||
};
|
||||
|
||||
@@ -10,15 +10,16 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
|
||||
});
|
||||
// Register tasks
|
||||
registerTasks();
|
||||
initMessageHandlers();
|
||||
}
|
||||
|
||||
void ClusterManager::registerTasks() {
|
||||
taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
||||
taskManager.registerTask("discovery_listen", ctx.config.discovery_interval_ms / 10, [this]() { listenForDiscovery(); });
|
||||
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
||||
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
|
||||
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
|
||||
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
|
||||
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
|
||||
taskManager.registerTask("update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
|
||||
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
|
||||
LOG_INFO("ClusterManager", "Registered all cluster tasks");
|
||||
}
|
||||
|
||||
@@ -29,28 +30,62 @@ void ClusterManager::sendDiscovery() {
|
||||
ctx.udp->endPacket();
|
||||
}
|
||||
|
||||
// TODO the various if statements here are a mess, we need to clean them up
|
||||
// TODO we should use a state machine to handle the different types of messages
|
||||
// TODO we should use a class to handle the different types of messages using predicate functions
|
||||
void ClusterManager::listenForDiscovery() {
|
||||
void ClusterManager::listen() {
|
||||
int packetSize = ctx.udp->parsePacket();
|
||||
if (packetSize) {
|
||||
if (!packetSize) {
|
||||
return;
|
||||
}
|
||||
|
||||
char incoming[ClusterProtocol::UDP_BUF_SIZE];
|
||||
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
|
||||
if (len > 0) {
|
||||
incoming[len] = 0;
|
||||
if (len <= 0) {
|
||||
return;
|
||||
}
|
||||
//LOG_DEBUG(ctx, "UDP", "Packet received: " + String(incoming));
|
||||
if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) {
|
||||
//LOG_DEBUG(ctx, "UDP", "Discovery request from: " + ctx.udp->remoteIP().toString());
|
||||
incoming[len] = 0;
|
||||
handleIncomingMessage(incoming);
|
||||
}
|
||||
|
||||
void ClusterManager::initMessageHandlers() {
|
||||
messageHandlers.clear();
|
||||
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
|
||||
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
|
||||
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
|
||||
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
|
||||
}
|
||||
|
||||
void ClusterManager::handleIncomingMessage(const char* incoming) {
|
||||
for (const auto& h : messageHandlers) {
|
||||
if (h.predicate(incoming)) {
|
||||
h.handle(incoming);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ClusterManager::isDiscoveryMsg(const char* msg) {
|
||||
return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isHeartbeatMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isResponseMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isNodeInfoMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
|
||||
}
|
||||
|
||||
void ClusterManager::onDiscovery(const char* /*msg*/) {
|
||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
||||
ctx.udp->write(response.c_str());
|
||||
ctx.udp->endPacket();
|
||||
//LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname);
|
||||
} else if (strncmp(incoming, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0) {
|
||||
// Reply directly to heartbeat sender with our node info JSON
|
||||
// Format: CLUSTER_NODE_INFO:<hostname>:<json>
|
||||
}
|
||||
|
||||
void ClusterManager::onHeartbeat(const char* /*msg*/) {
|
||||
JsonDocument doc;
|
||||
doc["freeHeap"] = ESP.getFreeHeap();
|
||||
doc["chipId"] = ESP.getChipId();
|
||||
@@ -58,7 +93,6 @@ void ClusterManager::listenForDiscovery() {
|
||||
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
|
||||
doc["flashChipSize"] = ESP.getFlashChipSize();
|
||||
|
||||
// Include labels if available
|
||||
if (ctx.memberList) {
|
||||
auto it = ctx.memberList->find(ctx.hostname);
|
||||
if (it != ctx.memberList->end()) {
|
||||
@@ -81,13 +115,16 @@ void ClusterManager::listenForDiscovery() {
|
||||
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
|
||||
ctx.udp->write(msg.c_str());
|
||||
ctx.udp->endPacket();
|
||||
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
|
||||
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
||||
}
|
||||
|
||||
void ClusterManager::onResponse(const char* msg) {
|
||||
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
||||
String nodeHost = String(hostPtr);
|
||||
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
||||
} else if (strncmp(incoming, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0) {
|
||||
// Parse: CLUSTER_NODE_INFO:<hostname>:<json>
|
||||
char* p = incoming + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
||||
}
|
||||
|
||||
void ClusterManager::onNodeInfo(const char* msg) {
|
||||
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
||||
char* hostEnd = strchr(p, ':');
|
||||
if (hostEnd) {
|
||||
*hostEnd = '\0';
|
||||
@@ -97,10 +134,8 @@ void ClusterManager::listenForDiscovery() {
|
||||
String nodeHost = String(hostCStr);
|
||||
IPAddress senderIP = ctx.udp->remoteIP();
|
||||
|
||||
// Ensure node exists/updated basic info
|
||||
addOrUpdateNode(nodeHost, senderIP);
|
||||
|
||||
// Parse JSON
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, jsonCStr);
|
||||
if (!err) {
|
||||
@@ -117,9 +152,12 @@ void ClusterManager::listenForDiscovery() {
|
||||
node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz;
|
||||
node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize;
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
node.lastSeen = millis();
|
||||
unsigned long now = millis();
|
||||
node.lastSeen = now;
|
||||
if (lastHeartbeatSentAt != 0) {
|
||||
node.latency = now - lastHeartbeatSentAt;
|
||||
}
|
||||
|
||||
// Labels
|
||||
node.labels.clear();
|
||||
if (doc["labels"].is<JsonObject>()) {
|
||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||
@@ -135,8 +173,6 @@ void ClusterManager::listenForDiscovery() {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
@@ -281,6 +317,7 @@ void ClusterManager::heartbeatTaskCallback() {
|
||||
}
|
||||
|
||||
// Broadcast heartbeat so peers can respond with their node info
|
||||
lastHeartbeatSentAt = millis();
|
||||
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
||||
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
|
||||
ctx.udp->write(hb.c_str());
|
||||
|
||||
@@ -10,10 +10,11 @@ Config::Config() {
|
||||
api_server_port = 80;
|
||||
|
||||
// Cluster Configuration
|
||||
discovery_interval_ms = 1000;
|
||||
discovery_interval_ms = 1000; // TODO retire this in favor of heartbeat_interval_ms
|
||||
cluster_listen_interval_ms = 10;
|
||||
heartbeat_interval_ms = 5000;
|
||||
status_update_interval_ms = 1000;
|
||||
member_info_update_interval_ms = 10000;
|
||||
member_info_update_interval_ms = 10000; // TODO retire this in favor of heartbeat_interval_ms
|
||||
print_interval_ms = 5000;
|
||||
|
||||
// Node Status Thresholds
|
||||
|
||||
Reference in New Issue
Block a user