From 356ec3d3813208c6107927636a54964527087787 Mon Sep 17 00:00:00 2001 From: Patrick Balsiger Date: Thu, 25 Sep 2025 20:44:31 +0200 Subject: [PATCH] feat: simplify udp listen --- docs/Architecture.md | 2 +- docs/TaskManagement.md | 8 +++--- include/spore/core/ClusterManager.h | 4 +-- src/spore/core/ClusterManager.cpp | 44 +++++++++-------------------- 4 files changed, 19 insertions(+), 39 deletions(-) diff --git a/docs/Architecture.md b/docs/Architecture.md index 0237207..349353c 100644 --- a/docs/Architecture.md +++ b/docs/Architecture.md @@ -74,7 +74,7 @@ The system runs several background tasks at different intervals: | Task | Interval (default) | Purpose | |------|--------------------|---------| | `discovery_send` | 1000 ms | Send UDP discovery packets | -| `discovery_listen` | 100 ms | Listen for discovery/heartbeat/node-info | +| `cluster_listen` | 100 ms | Listen for discovery/heartbeat/node-info | | `status_update` | 1000 ms | Update node status categories, purge dead | | `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources | | `update_members_info` | 10000 ms | Reserved; no-op (info via UDP) | diff --git a/docs/TaskManagement.md b/docs/TaskManagement.md index ad9da85..b583463 100644 --- a/docs/TaskManagement.md +++ b/docs/TaskManagement.md @@ -319,18 +319,18 @@ curl -X POST http://192.168.1.100/api/tasks/control \ ### Before (with wrapper functions): ```cpp void discoverySendTask() { cluster.sendDiscovery(); } -void discoveryListenTask() { cluster.listenForDiscovery(); } +void clusterListenTask() { cluster.listen(); } taskManager.registerTask("discovery_send", interval, discoverySendTask); -taskManager.registerTask("discovery_listen", interval, discoveryListenTask); +taskManager.registerTask("cluster_listen", interval, clusterListenTask); ``` ### After (with std::bind): ```cpp taskManager.registerTask("discovery_send", interval, std::bind(&ClusterManager::sendDiscovery, &cluster)); -taskManager.registerTask("discovery_listen", interval, - std::bind(&ClusterManager::listenForDiscovery, &cluster)); +taskManager.registerTask("cluster_listen", interval, + std::bind(&ClusterManager::listen, &cluster)); ``` ## Compatibility diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index c0ca077..451df78 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -15,7 +15,7 @@ public: ClusterManager(NodeContext& ctx, TaskManager& taskMgr); void registerTasks(); void sendDiscovery(); - void listenForDiscovery(); + void listen(); void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP); void updateAllNodeStatuses(); void removeDeadNodes(); @@ -28,7 +28,6 @@ public: private: NodeContext& ctx; TaskManager& taskManager; - enum class ListenState { WAITING_FOR_PACKET, MESSAGE_RECEIVED, DISPATCHING, DONE }; struct MessageHandler { bool (*predicate)(const char*); std::function handle; @@ -44,6 +43,5 @@ private: void onHeartbeat(const char* msg); void onResponse(const char* msg); void onNodeInfo(const char* msg); - ListenState listenState = ListenState::WAITING_FOR_PACKET; std::vector messageHandlers; }; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index f6196d7..c449ae4 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -15,7 +15,7 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx void ClusterManager::registerTasks() { taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); - taskManager.registerTask("discovery_listen", ctx.config.discovery_interval_ms / 10, [this]() { listenForDiscovery(); }); + taskManager.registerTask("cluster_listen", ctx.config.discovery_interval_ms / 10, [this]() { listen(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); @@ -30,37 +30,19 @@ void ClusterManager::sendDiscovery() { ctx.udp->endPacket(); } -void ClusterManager::listenForDiscovery() { - switch (listenState) { - case ListenState::WAITING_FOR_PACKET: { - int packetSize = ctx.udp->parsePacket(); - if (packetSize) { - listenState = ListenState::MESSAGE_RECEIVED; - } - break; - } - case ListenState::MESSAGE_RECEIVED: { - char incoming[ClusterProtocol::UDP_BUF_SIZE]; - int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); - if (len > 0) { - incoming[len] = 0; - listenState = ListenState::DISPATCHING; - handleIncomingMessage(incoming); - } else { - listenState = ListenState::DONE; - } - break; - } - case ListenState::DISPATCHING: { - // handled synchronously - listenState = ListenState::DONE; - break; - } - case ListenState::DONE: { - listenState = ListenState::WAITING_FOR_PACKET; - break; - } +void ClusterManager::listen() { + int packetSize = ctx.udp->parsePacket(); + if (!packetSize) { + return; } + + char incoming[ClusterProtocol::UDP_BUF_SIZE]; + int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); + if (len <= 0) { + return; + } + incoming[len] = 0; + handleIncomingMessage(incoming); } void ClusterManager::initMessageHandlers() {