Compare commits

...

1 Commits

4 changed files with 17 additions and 17 deletions

View File

@@ -140,7 +140,7 @@ The `NodeContext` provides an event-driven architecture for system-wide communic
```cpp ```cpp
// Subscribe to events // Subscribe to events
ctx.on("node_discovered", [](void* data) { ctx.on("node/discovered", [](void* data) {
NodeInfo* node = static_cast<NodeInfo*>(data); NodeInfo* node = static_cast<NodeInfo*>(data);
// Handle new node discovery // Handle new node discovery
}); });
@@ -154,13 +154,13 @@ ctx.on("cluster_updated", [](void* data) {
```cpp ```cpp
// Publish events // Publish events
ctx.fire("node_discovered", &newNode); ctx.fire("node/discovered", &newNode);
ctx.fire("cluster_updated", &clusterData); ctx.fire("cluster_updated", &clusterData);
``` ```
### Available Events ### Available Events
- **`node_discovered`**: New node added or local node refreshed - **`node/discovered`**: New node added or local node refreshed
## Resource Monitoring ## Resource Monitoring

View File

@@ -3,8 +3,8 @@
#include "spore/util/Logging.h" #include "spore/util/Logging.h"
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) { ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
// Register callback for node_discovered event // Register callback for node/discovered event
ctx.on("node_discovered", [this](void* data) { ctx.on("node/discovered", [this](void* data) {
NodeInfo* node = static_cast<NodeInfo*>(data); NodeInfo* node = static_cast<NodeInfo*>(data);
this->addOrUpdateNode(node->hostname, node->ip); this->addOrUpdateNode(node->hostname, node->ip);
}); });
@@ -34,9 +34,8 @@ void ClusterManager::registerTasks() {
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); }); taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); }); //taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
LOG_INFO("ClusterManager", "Registered all cluster tasks"); LOG_INFO("ClusterManager", "Registered all cluster tasks");
} }
@@ -155,7 +154,10 @@ void ClusterManager::onHeartbeat(const char* /*msg*/) {
void ClusterManager::onResponse(const char* msg) { void ClusterManager::onResponse(const char* msg) {
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1; char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
String nodeHost = String(hostPtr); String nodeHost = String(hostPtr);
addOrUpdateNode(nodeHost, ctx.udp->remoteIP()); NodeInfo discovered;
discovered.hostname = nodeHost;
discovered.ip = ctx.udp->remoteIP();
ctx.fire("node/discovered", &discovered);
} }
void ClusterManager::onNodeInfo(const char* msg) { void ClusterManager::onNodeInfo(const char* msg) {
@@ -169,7 +171,10 @@ void ClusterManager::onNodeInfo(const char* msg) {
String nodeHost = String(hostCStr); String nodeHost = String(hostCStr);
IPAddress senderIP = ctx.udp->remoteIP(); IPAddress senderIP = ctx.udp->remoteIP();
addOrUpdateNode(nodeHost, senderIP); NodeInfo discovered;
discovered.hostname = nodeHost;
discovered.ip = senderIP;
ctx.fire("node/discovered", &discovered);
JsonDocument doc; JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonCStr); DeserializationError err = deserializeJson(doc, jsonCStr);
@@ -392,7 +397,7 @@ void ClusterManager::heartbeatTaskCallback() {
node.lastSeen = millis(); node.lastSeen = millis();
node.status = NodeInfo::ACTIVE; node.status = NodeInfo::ACTIVE;
updateLocalNodeResources(); updateLocalNodeResources();
ctx.fire("node_discovered", &node); ctx.fire("node/discovered", &node);
} }
// Broadcast heartbeat so peers can respond with their node info // Broadcast heartbeat so peers can respond with their node info
@@ -403,11 +408,6 @@ void ClusterManager::heartbeatTaskCallback() {
ctx.udp->endPacket(); ctx.udp->endPacket();
} }
void ClusterManager::updateAllMembersInfoTaskCallback() {
// HTTP-based member info fetching disabled; node info is provided via UDP responses to heartbeats
// No-op to reduce network and memory usage
}
void ClusterManager::updateAllNodeStatuses() { void ClusterManager::updateAllNodeStatuses() {
auto& memberList = *ctx.memberList; auto& memberList = *ctx.memberList;
unsigned long now = millis(); unsigned long now = millis();

View File

@@ -118,5 +118,5 @@ void NetworkManager::setupWiFi() {
} }
// Notify listeners that the node is (re)discovered // Notify listeners that the node is (re)discovered
ctx.fire("node_discovered", &ctx.self); ctx.fire("node/discovered", &ctx.self);
} }

View File

@@ -56,7 +56,7 @@ ws.on('open', () => {
ws.on('message', (data) => { ws.on('message', (data) => {
// Optionally throttle logs: comment out for quieter output // Optionally throttle logs: comment out for quieter output
// console.log('WS:', data.toString()); //console.log('WS:', data.toString());
}); });
ws.on('error', (err) => { ws.on('error', (err) => {