feat(cluster): use node/discovered event to update memberlist
This commit is contained in:
@@ -68,7 +68,7 @@ The cluster uses a UDP-based discovery protocol for automatic node detection:
|
|||||||
- **Heartbeat**: `CLUSTER_HEARTBEAT:<hostname>`
|
- **Heartbeat**: `CLUSTER_HEARTBEAT:<hostname>`
|
||||||
- Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval
|
- Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval
|
||||||
- Purpose: prompt peers to reply with their node info and keep liveness
|
- Purpose: prompt peers to reply with their node info and keep liveness
|
||||||
- **Node Info**: `CLUSTER_NODE_INFO:<hostname>:<json>`
|
- **Node Info**: `CLUSTER_NODE_INFO:<hostname>:<json>`git add
|
||||||
- Sender: node receiving a heartbeat; unicast to heartbeat sender IP
|
- Sender: node receiving a heartbeat; unicast to heartbeat sender IP
|
||||||
- JSON fields: freeHeap, chipId, sdkVersion, cpuFreqMHz, flashChipSize, optional labels
|
- JSON fields: freeHeap, chipId, sdkVersion, cpuFreqMHz, flashChipSize, optional labels
|
||||||
|
|
||||||
@@ -140,7 +140,7 @@ The `NodeContext` provides an event-driven architecture for system-wide communic
|
|||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
// Subscribe to events
|
// Subscribe to events
|
||||||
ctx.on("node_discovered", [](void* data) {
|
ctx.on("node/discovered", [](void* data) {
|
||||||
NodeInfo* node = static_cast<NodeInfo*>(data);
|
NodeInfo* node = static_cast<NodeInfo*>(data);
|
||||||
// Handle new node discovery
|
// Handle new node discovery
|
||||||
});
|
});
|
||||||
@@ -154,13 +154,13 @@ ctx.on("cluster_updated", [](void* data) {
|
|||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
// Publish events
|
// Publish events
|
||||||
ctx.fire("node_discovered", &newNode);
|
ctx.fire("node/discovered", &newNode);
|
||||||
ctx.fire("cluster_updated", &clusterData);
|
ctx.fire("cluster_updated", &clusterData);
|
||||||
```
|
```
|
||||||
|
|
||||||
### Available Events
|
### Available Events
|
||||||
|
|
||||||
- **`node_discovered`**: New node added or local node refreshed
|
- **`node/discovered`**: New node added or local node refreshed
|
||||||
|
|
||||||
## Resource Monitoring
|
## Resource Monitoring
|
||||||
|
|
||||||
|
|||||||
@@ -3,8 +3,8 @@
|
|||||||
#include "spore/util/Logging.h"
|
#include "spore/util/Logging.h"
|
||||||
|
|
||||||
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
|
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
|
||||||
// Register callback for node_discovered event
|
// Register callback for node/discovered event
|
||||||
ctx.on("node_discovered", [this](void* data) {
|
ctx.on("node/discovered", [this](void* data) {
|
||||||
NodeInfo* node = static_cast<NodeInfo*>(data);
|
NodeInfo* node = static_cast<NodeInfo*>(data);
|
||||||
this->addOrUpdateNode(node->hostname, node->ip);
|
this->addOrUpdateNode(node->hostname, node->ip);
|
||||||
});
|
});
|
||||||
@@ -34,9 +34,8 @@ void ClusterManager::registerTasks() {
|
|||||||
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
||||||
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
|
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
|
||||||
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
|
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
|
||||||
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
|
|
||||||
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
|
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
|
||||||
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
|
//taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
|
||||||
LOG_INFO("ClusterManager", "Registered all cluster tasks");
|
LOG_INFO("ClusterManager", "Registered all cluster tasks");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,7 +154,10 @@ void ClusterManager::onHeartbeat(const char* /*msg*/) {
|
|||||||
void ClusterManager::onResponse(const char* msg) {
|
void ClusterManager::onResponse(const char* msg) {
|
||||||
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
||||||
String nodeHost = String(hostPtr);
|
String nodeHost = String(hostPtr);
|
||||||
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
NodeInfo discovered;
|
||||||
|
discovered.hostname = nodeHost;
|
||||||
|
discovered.ip = ctx.udp->remoteIP();
|
||||||
|
ctx.fire("node/discovered", &discovered);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::onNodeInfo(const char* msg) {
|
void ClusterManager::onNodeInfo(const char* msg) {
|
||||||
@@ -169,7 +171,10 @@ void ClusterManager::onNodeInfo(const char* msg) {
|
|||||||
String nodeHost = String(hostCStr);
|
String nodeHost = String(hostCStr);
|
||||||
IPAddress senderIP = ctx.udp->remoteIP();
|
IPAddress senderIP = ctx.udp->remoteIP();
|
||||||
|
|
||||||
addOrUpdateNode(nodeHost, senderIP);
|
NodeInfo discovered;
|
||||||
|
discovered.hostname = nodeHost;
|
||||||
|
discovered.ip = senderIP;
|
||||||
|
ctx.fire("node/discovered", &discovered);
|
||||||
|
|
||||||
JsonDocument doc;
|
JsonDocument doc;
|
||||||
DeserializationError err = deserializeJson(doc, jsonCStr);
|
DeserializationError err = deserializeJson(doc, jsonCStr);
|
||||||
@@ -392,7 +397,7 @@ void ClusterManager::heartbeatTaskCallback() {
|
|||||||
node.lastSeen = millis();
|
node.lastSeen = millis();
|
||||||
node.status = NodeInfo::ACTIVE;
|
node.status = NodeInfo::ACTIVE;
|
||||||
updateLocalNodeResources();
|
updateLocalNodeResources();
|
||||||
ctx.fire("node_discovered", &node);
|
ctx.fire("node/discovered", &node);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast heartbeat so peers can respond with their node info
|
// Broadcast heartbeat so peers can respond with their node info
|
||||||
@@ -403,11 +408,6 @@ void ClusterManager::heartbeatTaskCallback() {
|
|||||||
ctx.udp->endPacket();
|
ctx.udp->endPacket();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::updateAllMembersInfoTaskCallback() {
|
|
||||||
// HTTP-based member info fetching disabled; node info is provided via UDP responses to heartbeats
|
|
||||||
// No-op to reduce network and memory usage
|
|
||||||
}
|
|
||||||
|
|
||||||
void ClusterManager::updateAllNodeStatuses() {
|
void ClusterManager::updateAllNodeStatuses() {
|
||||||
auto& memberList = *ctx.memberList;
|
auto& memberList = *ctx.memberList;
|
||||||
unsigned long now = millis();
|
unsigned long now = millis();
|
||||||
|
|||||||
@@ -118,5 +118,5 @@ void NetworkManager::setupWiFi() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Notify listeners that the node is (re)discovered
|
// Notify listeners that the node is (re)discovered
|
||||||
ctx.fire("node_discovered", &ctx.self);
|
ctx.fire("node/discovered", &ctx.self);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ ws.on('open', () => {
|
|||||||
|
|
||||||
ws.on('message', (data) => {
|
ws.on('message', (data) => {
|
||||||
// Optionally throttle logs: comment out for quieter output
|
// Optionally throttle logs: comment out for quieter output
|
||||||
// console.log('WS:', data.toString());
|
//console.log('WS:', data.toString());
|
||||||
});
|
});
|
||||||
|
|
||||||
ws.on('error', (err) => {
|
ws.on('error', (err) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user