feature/node-info-sync #8

Merged
master merged 7 commits from feature/node-info-sync into main 2025-09-28 12:26:39 +02:00
4 changed files with 19 additions and 39 deletions
Showing only changes of commit 356ec3d381 - Show all commits

View File

@@ -74,7 +74,7 @@ The system runs several background tasks at different intervals:
| Task | Interval (default) | Purpose | | Task | Interval (default) | Purpose |
|------|--------------------|---------| |------|--------------------|---------|
| `discovery_send` | 1000 ms | Send UDP discovery packets | | `discovery_send` | 1000 ms | Send UDP discovery packets |
| `discovery_listen` | 100 ms | Listen for discovery/heartbeat/node-info | | `cluster_listen` | 100 ms | Listen for discovery/heartbeat/node-info |
| `status_update` | 1000 ms | Update node status categories, purge dead | | `status_update` | 1000 ms | Update node status categories, purge dead |
| `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources | | `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources |
| `update_members_info` | 10000 ms | Reserved; no-op (info via UDP) | | `update_members_info` | 10000 ms | Reserved; no-op (info via UDP) |

View File

@@ -319,18 +319,18 @@ curl -X POST http://192.168.1.100/api/tasks/control \
### Before (with wrapper functions): ### Before (with wrapper functions):
```cpp ```cpp
void discoverySendTask() { cluster.sendDiscovery(); } void discoverySendTask() { cluster.sendDiscovery(); }
void discoveryListenTask() { cluster.listenForDiscovery(); } void clusterListenTask() { cluster.listen(); }
taskManager.registerTask("discovery_send", interval, discoverySendTask); taskManager.registerTask("discovery_send", interval, discoverySendTask);
taskManager.registerTask("discovery_listen", interval, discoveryListenTask); taskManager.registerTask("cluster_listen", interval, clusterListenTask);
``` ```
### After (with std::bind): ### After (with std::bind):
```cpp ```cpp
taskManager.registerTask("discovery_send", interval, taskManager.registerTask("discovery_send", interval,
std::bind(&ClusterManager::sendDiscovery, &cluster)); std::bind(&ClusterManager::sendDiscovery, &cluster));
taskManager.registerTask("discovery_listen", interval, taskManager.registerTask("cluster_listen", interval,
std::bind(&ClusterManager::listenForDiscovery, &cluster)); std::bind(&ClusterManager::listen, &cluster));
``` ```
## Compatibility ## Compatibility

View File

@@ -15,7 +15,7 @@ public:
ClusterManager(NodeContext& ctx, TaskManager& taskMgr); ClusterManager(NodeContext& ctx, TaskManager& taskMgr);
void registerTasks(); void registerTasks();
void sendDiscovery(); void sendDiscovery();
void listenForDiscovery(); void listen();
void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP); void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP);
void updateAllNodeStatuses(); void updateAllNodeStatuses();
void removeDeadNodes(); void removeDeadNodes();
@@ -28,7 +28,6 @@ public:
private: private:
NodeContext& ctx; NodeContext& ctx;
TaskManager& taskManager; TaskManager& taskManager;
enum class ListenState { WAITING_FOR_PACKET, MESSAGE_RECEIVED, DISPATCHING, DONE };
struct MessageHandler { struct MessageHandler {
bool (*predicate)(const char*); bool (*predicate)(const char*);
std::function<void(const char*)> handle; std::function<void(const char*)> handle;
@@ -44,6 +43,5 @@ private:
void onHeartbeat(const char* msg); void onHeartbeat(const char* msg);
void onResponse(const char* msg); void onResponse(const char* msg);
void onNodeInfo(const char* msg); void onNodeInfo(const char* msg);
ListenState listenState = ListenState::WAITING_FOR_PACKET;
std::vector<MessageHandler> messageHandlers; std::vector<MessageHandler> messageHandlers;
}; };

View File

@@ -15,7 +15,7 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
void ClusterManager::registerTasks() { void ClusterManager::registerTasks() {
taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
taskManager.registerTask("discovery_listen", ctx.config.discovery_interval_ms / 10, [this]() { listenForDiscovery(); }); taskManager.registerTask("cluster_listen", ctx.config.discovery_interval_ms / 10, [this]() { listen(); });
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
@@ -30,37 +30,19 @@ void ClusterManager::sendDiscovery() {
ctx.udp->endPacket(); ctx.udp->endPacket();
} }
void ClusterManager::listenForDiscovery() { void ClusterManager::listen() {
switch (listenState) {
case ListenState::WAITING_FOR_PACKET: {
int packetSize = ctx.udp->parsePacket(); int packetSize = ctx.udp->parsePacket();
if (packetSize) { if (!packetSize) {
listenState = ListenState::MESSAGE_RECEIVED; return;
} }
break;
}
case ListenState::MESSAGE_RECEIVED: {
char incoming[ClusterProtocol::UDP_BUF_SIZE]; char incoming[ClusterProtocol::UDP_BUF_SIZE];
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
if (len > 0) { if (len <= 0) {
return;
}
incoming[len] = 0; incoming[len] = 0;
listenState = ListenState::DISPATCHING;
handleIncomingMessage(incoming); handleIncomingMessage(incoming);
} else {
listenState = ListenState::DONE;
}
break;
}
case ListenState::DISPATCHING: {
// handled synchronously
listenState = ListenState::DONE;
break;
}
case ListenState::DONE: {
listenState = ListenState::WAITING_FOR_PACKET;
break;
}
}
} }
void ClusterManager::initMessageHandlers() { void ClusterManager::initMessageHandlers() {