diff --git a/docs/API.md b/docs/API.md index 4ee4525..b477c36 100644 --- a/docs/API.md +++ b/docs/API.md @@ -15,12 +15,18 @@ The SPORE system provides a comprehensive RESTful API for monitoring and control | Endpoint | Method | Description | Response | |----------|--------|-------------|----------| -| `/api/node/status` | GET | System resource information and API endpoint registry | System metrics and API catalog | +| `/api/node/status` | GET | System resource information | System metrics | | `/api/node/endpoints` | GET | API endpoints and parameters | Detailed endpoint specifications | | `/api/cluster/members` | GET | Cluster membership and node health information | Cluster topology and health status | | `/api/node/update` | POST | Handle firmware updates via OTA | Update progress and status | | `/api/node/restart` | POST | Trigger system restart | Restart confirmation | +### Monitoring API + +| Endpoint | Method | Description | Response | +|----------|--------|-------------|----------| +| `/api/monitoring/resources` | GET | CPU, memory, filesystem, and uptime | System resource metrics | + ### Network Management API | Endpoint | Method | Description | Response | @@ -140,7 +146,7 @@ Controls the execution state of individual tasks. Supports enabling, disabling, #### GET /api/node/status -Returns comprehensive system resource information including memory usage, chip details, and a registry of all available API endpoints. +Returns comprehensive system resource information including memory usage and chip details. For a list of available API endpoints, use `/api/node/endpoints`. **Response Fields:** - `freeHeap`: Available RAM in bytes @@ -168,7 +174,7 @@ Returns comprehensive system resource information including memory usage, chip d #### GET /api/node/endpoints -Returns detailed information about all available API endpoints, including their parameters, types, and validation rules. +Returns detailed information about all available API endpoints, including their parameters, types, and validation rules. Methods are returned as strings (e.g., "GET", "POST"). **Response Fields:** - `endpoints[]`: Array of endpoint capability objects @@ -236,6 +242,54 @@ Initiates an over-the-air firmware update. The firmware file should be uploaded Triggers a system restart. The response will be sent before the restart occurs. +### Monitoring + +#### GET /api/monitoring/resources + +Returns real-time system resource metrics. + +Response Fields: +- `cpu.current_usage`: Current CPU usage percent +- `cpu.average_usage`: Average CPU usage percent +- `cpu.max_usage`: Max observed CPU usage +- `cpu.min_usage`: Min observed CPU usage +- `cpu.measurement_count`: Number of measurements +- `cpu.is_measuring`: Whether measurement is active +- `memory.free_heap`: Free heap bytes +- `memory.total_heap`: Total heap bytes (approximate) +- `memory.heap_fragmentation`: Fragmentation percent (0 on ESP8266) +- `filesystem.total_bytes`: LittleFS total bytes +- `filesystem.used_bytes`: Used bytes +- `filesystem.free_bytes`: Free bytes +- `filesystem.usage_percent`: Usage percent +- `system.uptime_ms`: Uptime in milliseconds +Example Response: +```json +{ + "cpu": { + "current_usage": 3.5, + "average_usage": 2.1, + "max_usage": 15.2, + "min_usage": 0.0, + "measurement_count": 120, + "is_measuring": true + }, + "memory": { + "free_heap": 48748, + "total_heap": 81920, + "heap_fragmentation": 0 + }, + "filesystem": { + "total_bytes": 65536, + "used_bytes": 10240, + "free_bytes": 55296, + "usage_percent": 15.6 + }, + "system": { + "uptime_ms": 123456 + } +} +``` ### Network Management #### GET /api/network/status diff --git a/docs/Architecture.md b/docs/Architecture.md index a8e3477..dfbb4ba 100644 --- a/docs/Architecture.md +++ b/docs/Architecture.md @@ -25,9 +25,9 @@ The system architecture consists of several key components working together: - **Service Registry**: Track available services across the cluster ### Task Scheduler -- **Cooperative Multitasking**: Background task management system -- **Task Lifecycle Management**: Automatic task execution and monitoring -- **Resource Optimization**: Efficient task scheduling and execution +- **Cooperative Multitasking**: Background task management system (`TaskManager`) +- **Task Lifecycle Management**: Enable/disable tasks and set intervals at runtime +- **Execution Model**: Tasks run in `Spore::loop()` when their interval elapses ### Node Context - **Central Context**: Shared resources and configuration @@ -40,27 +40,75 @@ The cluster uses a UDP-based discovery protocol for automatic node detection: ### Discovery Process -1. **Discovery Broadcast**: Nodes periodically send UDP packets on port 4210 -2. **Response Handling**: Nodes respond with their hostname and IP address -3. **Member Management**: Discovered nodes are automatically added to the cluster -4. **Health Monitoring**: Continuous status checking via HTTP API calls +1. **Discovery Broadcast**: Nodes periodically send UDP packets on port `udp_port` (default 4210) +2. **Response Handling**: Nodes respond with `CLUSTER_RESPONSE:` +3. **Member Management**: Discovered nodes are added/updated in the cluster +4. **Node Info via UDP**: Heartbeat triggers peers to send `CLUSTER_NODE_INFO::` ### Protocol Details -- **UDP Port**: 4210 (configurable) +- **UDP Port**: 4210 (configurable via `Config.udp_port`) - **Discovery Message**: `CLUSTER_DISCOVERY` - **Response Message**: `CLUSTER_RESPONSE` +- **Heartbeat Message**: `CLUSTER_HEARTBEAT` +- **Node Info Message**: `CLUSTER_NODE_INFO::` - **Broadcast Address**: 255.255.255.255 -- **Discovery Interval**: 1 second (configurable) -- **Listen Interval**: 100ms (configurable) +- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms) +- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) +- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) + +### Message Formats + +- **Discovery**: `CLUSTER_DISCOVERY` + - Sender: any node, broadcast to 255.255.255.255:`udp_port` + - Purpose: announce presence and solicit peer identification +- **Response**: `CLUSTER_RESPONSE:` + - Sender: node receiving a discovery; unicast to requester IP + - Purpose: provide hostname so requester can register/update member +- **Heartbeat**: `CLUSTER_HEARTBEAT:` + - Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval + - Purpose: prompt peers to reply with their node info and keep liveness +- **Node Info**: `CLUSTER_NODE_INFO::` + - Sender: node receiving a heartbeat; unicast to heartbeat sender IP + - JSON fields: freeHeap, chipId, sdkVersion, cpuFreqMHz, flashChipSize, optional labels + +### Discovery Flow + +1. **Sender broadcasts** `CLUSTER_DISCOVERY` +2. **Each receiver responds** with `CLUSTER_RESPONSE:` to the sender IP +3. **Sender registers/updates** the node using hostname and source IP + +### Heartbeat Flow + +1. **A node broadcasts** `CLUSTER_HEARTBEAT:` +2. **Each receiver replies** with `CLUSTER_NODE_INFO::` to the heartbeat sender IP +3. **The sender**: + - Ensures the node exists or creates it with `hostname` and sender IP + - Parses JSON and updates resources, labels, `status = ACTIVE`, `lastSeen = now` + - Sets `latency = now - lastHeartbeatSentAt` (per-node, measured at heartbeat origin) + +### Listener Behavior + +The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to: +- **Discovery** → send `CLUSTER_RESPONSE` +- **Heartbeat** → send `CLUSTER_NODE_INFO` JSON +- **Response** → add/update node using provided hostname and source IP +- **Node Info** → update resources/status/labels and record latency + +### Timing and Intervals + +- **UDP Port**: `Config.udp_port` (default 4210) +- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms) +- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) +- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) ### Node Status Categories Nodes are automatically categorized by their activity: -- **ACTIVE**: Responding within 10 seconds -- **INACTIVE**: No response for 10-60 seconds -- **DEAD**: No response for over 60 seconds +- **ACTIVE**: lastSeen < `node_inactive_threshold_ms` (default 10s) +- **INACTIVE**: < `node_dead_threshold_ms` (default 120s) +- **DEAD**: ≥ `node_dead_threshold_ms` ## Task Scheduling System @@ -68,14 +116,14 @@ The system runs several background tasks at different intervals: ### Core System Tasks -| Task | Interval | Purpose | -|------|----------|---------| -| **Discovery Send** | 1 second | Send UDP discovery packets | -| **Discovery Listen** | 100ms | Listen for discovery responses | -| **Status Updates** | 1 second | Monitor cluster member health | -| **Heartbeat** | 2 seconds | Maintain cluster connectivity | -| **Member Info** | 10 seconds | Update detailed node information | -| **Debug Output** | 5 seconds | Print cluster status | +| Task | Interval (default) | Purpose | +|------|--------------------|---------| +| `cluster_discovery` | 1000 ms | Send UDP discovery packets | +| `cluster_listen` | 10 ms | Listen for discovery/heartbeat/node-info | +| `status_update` | 1000 ms | Update node status categories, purge dead | +| `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources | +| `cluster_update_members_info` | 10000 ms | Reserved; no-op (info via UDP) | +| `print_members` | 5000 ms | Log current member list | ### Task Management Features @@ -112,10 +160,7 @@ ctx.fire("cluster_updated", &clusterData); ### Available Events -- **`node_discovered`**: New node added to cluster -- **`cluster_updated`**: Cluster membership changed -- **`resource_update`**: Node resources updated -- **`health_check`**: Node health status changed +- **`node_discovered`**: New node added or local node refreshed ## Resource Monitoring @@ -155,10 +200,8 @@ The system includes automatic WiFi fallback for robust operation: ### Configuration -- **SSID Format**: `SPORE_` -- **Password**: Configurable fallback password -- **IP Range**: 192.168.4.x subnet -- **Gateway**: 192.168.4.1 +- **Hostname**: Derived from MAC (`esp-`) and assigned to `ctx.hostname` +- **AP Mode**: If STA connection fails, device switches to AP mode with configured SSID/password ## Cluster Topology @@ -170,32 +213,30 @@ The system includes automatic WiFi fallback for robust operation: ### Network Architecture -- **Mesh-like Structure**: Nodes can communicate with each other -- **Dynamic Routing**: Automatic path discovery between nodes -- **Load Distribution**: Tasks distributed across available nodes -- **Fault Tolerance**: Automatic failover and recovery +- UDP broadcast-based discovery and heartbeats on local subnet +- Optional HTTP polling (disabled by default; node info exchanged via UDP) ## Data Flow ### Node Discovery 1. **UDP Broadcast**: Nodes broadcast discovery packets on port 4210 -2. **UDP Response**: Receiving nodes responds with hostname +2. **UDP Response**: Receiving nodes respond with hostname 3. **Registration**: Discovered nodes are added to local cluster member list ### Health Monitoring -1. **Periodic Checks**: Cluster manager polls member nodes every 1 second -2. **Status Collection**: Each node returns resource usage and health metrics +1. **Periodic Checks**: Cluster manager updates node status categories +2. **Status Collection**: Each node updates resources via UDP node-info messages ### Task Management -1. **Scheduling**: TaskScheduler executes registered tasks at configured intervals -2. **Execution**: Tasks run cooperatively, yielding control to other tasks -3. **Monitoring**: Task status and results are exposed via REST API endpoints +1. **Scheduling**: `TaskManager` executes registered tasks at configured intervals +2. **Execution**: Tasks run cooperatively in the main loop without preemption +3. **Monitoring**: Task status is exposed via REST (`/api/tasks/status`) ## Performance Characteristics ### Memory Usage -- **Base System**: ~15-20KB RAM +- **Base System**: ~15-20KB RAM (device dependent) - **Per Task**: ~100-200 bytes per task - **Cluster Members**: ~50-100 bytes per member - **API Endpoints**: ~20-30 bytes per endpoint @@ -219,7 +260,7 @@ The system includes automatic WiFi fallback for robust operation: ### Current Implementation - **Network Access**: Local network only (no internet exposure) -- **Authentication**: None currently implemented +- **Authentication**: None currently implemented; LAN-only access assumed - **Data Validation**: Basic input validation - **Resource Limits**: Memory and processing constraints diff --git a/docs/Development.md b/docs/Development.md index 11465f6..9885c78 100644 --- a/docs/Development.md +++ b/docs/Development.md @@ -20,57 +20,99 @@ ``` spore/ -├── src/ # Source code -│ ├── main.cpp # Main application entry point -│ ├── ApiServer.cpp # HTTP API server implementation -│ ├── ClusterManager.cpp # Cluster management logic -│ ├── NetworkManager.cpp # WiFi and network handling -│ ├── TaskManager.cpp # Background task management -│ └── NodeContext.cpp # Central context and events +├── src/ # Source code (framework under src/spore) +│ └── spore/ +│ ├── Spore.cpp # Framework lifecycle (setup/begin/loop) +│ ├── core/ # Core components +│ │ ├── ApiServer.cpp # HTTP API server implementation +│ │ ├── ClusterManager.cpp # Cluster management logic +│ │ ├── NetworkManager.cpp # WiFi and network handling +│ │ ├── TaskManager.cpp # Background task management +│ │ └── NodeContext.cpp # Central context and events +│ ├── services/ # Built-in services +│ │ ├── NodeService.cpp +│ │ ├── NetworkService.cpp +│ │ ├── ClusterService.cpp +│ │ ├── TaskService.cpp +│ │ ├── StaticFileService.cpp +│ │ └── MonitoringService.cpp +│ └── types/ # Shared types ├── include/ # Header files -├── lib/ # Library files +├── examples/ # Example apps per env (base, relay, neopattern) ├── docs/ # Documentation ├── api/ # OpenAPI specification -├── examples/ # Example code -├── test/ # Test files -├── platformio.ini # PlatformIO configuration -└── ctl.sh # Build and deployment scripts +├── platformio.ini # PlatformIO configuration +└── ctl.sh # Build and deployment scripts ``` ## PlatformIO Configuration ### Framework and Board -The project uses PlatformIO with the following configuration: +The project uses PlatformIO with the following configuration (excerpt): ```ini -[env:esp01_1m] +[platformio] +default_envs = base +src_dir = . +data_dir = ${PROJECT_DIR}/examples/${PIOENV}/data + +[common] +monitor_speed = 115200 +lib_deps = + esp32async/ESPAsyncWebServer@^3.8.0 + bblanchon/ArduinoJson@^7.4.2 + +[env:base] platform = platformio/espressif8266@^4.2.1 board = esp01_1m framework = arduino upload_speed = 115200 -flash_mode = dout +monitor_speed = 115200 +board_build.f_cpu = 80000000L +board_build.flash_mode = qio +board_build.filesystem = littlefs +; note: somehow partition table is not working, so we need to use the ldscript +board_build.ldscript = eagle.flash.1m64.ld +lib_deps = ${common.lib_deps} +build_src_filter = + + + + + + + + + + + + + + + +[env:d1_mini] +platform = platformio/espressif8266@^4.2.1 +board = d1_mini +framework = arduino +upload_speed = 115200 +monitor_speed = 115200 +board_build.filesystem = littlefs +board_build.flash_mode = dio ; D1 Mini uses DIO on 4 Mbit flash +board_build.flash_size = 4M +board_build.ldscript = eagle.flash.4m1m.ld +lib_deps = ${common.lib_deps} +build_src_filter = + + + + + + + + + + + + + + ``` -### Key Configuration Details - -- **Framework**: Arduino -- **Board**: ESP-01 with 1MB flash -- **Upload Speed**: 115200 baud -- **Flash Mode**: DOUT (required for ESP-01S) -- **Build Type**: Release (optimized for production) - ### Dependencies -The project requires the following libraries: +The project requires the following libraries (resolved via PlatformIO): ```ini lib_deps = esp32async/ESPAsyncWebServer@^3.8.0 bblanchon/ArduinoJson@^7.4.2 - arkhipenko/TaskScheduler@^3.8.5 - ESP8266HTTPClient@1.2 - ESP8266WiFi@1.0 ``` ### Filesystem, Linker Scripts, and Flash Layout @@ -103,7 +145,6 @@ Notes: - If you need a different FS size, select an appropriate ldscript variant and keep `board_build.filesystem = littlefs`. - On ESP8266, custom partition CSVs are not used for layout; the linker script defines the flash map. This project removed prior `board_build.partitions` usage in favor of explicit `board_build.ldscript` entries per environment. - ## Building ### Basic Build Commands @@ -308,7 +349,7 @@ export API_NODE=192.168.1.100 Key configuration files: - **`platformio.ini`**: Build and upload configuration -- **`src/Config.cpp`**: Application configuration +- **`src/spore/types/Config.cpp`**: Default runtime configuration - **`.env`**: Environment variables - **`ctl.sh`**: Build and deployment scripts diff --git a/docs/MonitoringService.md b/docs/MonitoringService.md new file mode 100644 index 0000000..d6748e2 --- /dev/null +++ b/docs/MonitoringService.md @@ -0,0 +1,79 @@ +# Monitoring Service + +Exposes system resource metrics via HTTP for observability. + +## Overview + +- **Service name**: `MonitoringService` +- **Endpoint**: `GET /api/monitoring/resources` +- **Metrics**: CPU usage, memory, filesystem, uptime + +## Endpoint + +### GET /api/monitoring/resources + +Returns real-time system resource metrics. + +Response fields: +- `cpu.current_usage`: Current CPU usage percent +- `cpu.average_usage`: Average CPU usage percent +- `cpu.max_usage`: Max observed CPU usage +- `cpu.min_usage`: Min observed CPU usage +- `cpu.measurement_count`: Number of measurements +- `cpu.is_measuring`: Whether measurement is active +- `memory.free_heap`: Free heap bytes +- `memory.total_heap`: Total heap bytes (approximate) +- `memory.min_free_heap`: Minimum free heap (0 on ESP8266) +- `memory.max_alloc_heap`: Max allocatable heap (0 on ESP8266) +- `memory.heap_fragmentation`: Fragmentation percent (0 on ESP8266) +- `filesystem.total_bytes`: LittleFS total bytes +- `filesystem.used_bytes`: Used bytes +- `filesystem.free_bytes`: Free bytes +- `filesystem.usage_percent`: Usage percent +- `system.uptime_ms`: Uptime in milliseconds +- `system.uptime_seconds`: Uptime in seconds +- `system.uptime_formatted`: Human-readable uptime + +Example: +```json +{ + "cpu": { + "current_usage": 3.5, + "average_usage": 2.1, + "max_usage": 15.2, + "min_usage": 0.0, + "measurement_count": 120, + "is_measuring": true + }, + "memory": { + "free_heap": 48748, + "total_heap": 81920, + "min_free_heap": 0, + "max_alloc_heap": 0, + "heap_fragmentation": 0, + "heap_usage_percent": 40.4 + }, + "filesystem": { + "total_bytes": 65536, + "used_bytes": 10240, + "free_bytes": 55296, + "usage_percent": 15.6 + }, + "system": { + "uptime_ms": 123456, + "uptime_seconds": 123, + "uptime_formatted": "0h 2m 3s" + } +} +``` + +## Implementation Notes + +- `MonitoringService` reads from `CpuUsage` and ESP8266 SDK APIs. +- Filesystem metrics are gathered from LittleFS. +- CPU measurement is bracketed by `Spore::loop()` calling `cpuUsage.startMeasurement()` and `cpuUsage.endMeasurement()`. + +## Troubleshooting + +- If `filesystem.total_bytes` is zero, ensure LittleFS is enabled in `platformio.ini` and an FS image is uploaded. +- CPU usage values remain zero until the main loop runs and CPU measurement is started. diff --git a/docs/README.md b/docs/README.md index 0dabf36..0505a00 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,15 +15,8 @@ Complete API reference with detailed endpoint documentation, examples, and integ - Task management workflows - Cluster monitoring examples -### 📖 [TaskManager.md](./TaskManager.md) -Comprehensive guide to the TaskManager system for background task management. - -**Includes:** -- Basic usage examples -- Advanced binding techniques -- Task status monitoring -- API integration details -- Performance considerations +### 📖 [MonitoringService.md](./MonitoringService.md) +System resource monitoring API for CPU, memory, filesystem, and uptime. ### 📖 [TaskManagement.md](./TaskManagement.md) Complete guide to the task management system with examples and best practices. diff --git a/docs/TaskManagement.md b/docs/TaskManagement.md index ad9da85..b583463 100644 --- a/docs/TaskManagement.md +++ b/docs/TaskManagement.md @@ -319,18 +319,18 @@ curl -X POST http://192.168.1.100/api/tasks/control \ ### Before (with wrapper functions): ```cpp void discoverySendTask() { cluster.sendDiscovery(); } -void discoveryListenTask() { cluster.listenForDiscovery(); } +void clusterListenTask() { cluster.listen(); } taskManager.registerTask("discovery_send", interval, discoverySendTask); -taskManager.registerTask("discovery_listen", interval, discoveryListenTask); +taskManager.registerTask("cluster_listen", interval, clusterListenTask); ``` ### After (with std::bind): ```cpp taskManager.registerTask("discovery_send", interval, std::bind(&ClusterManager::sendDiscovery, &cluster)); -taskManager.registerTask("discovery_listen", interval, - std::bind(&ClusterManager::listenForDiscovery, &cluster)); +taskManager.registerTask("cluster_listen", interval, + std::bind(&ClusterManager::listen, &cluster)); ``` ## Compatibility diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index b0decd5..cd994f0 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -7,13 +7,15 @@ #include #include #include +#include +#include class ClusterManager { public: ClusterManager(NodeContext& ctx, TaskManager& taskMgr); void registerTasks(); void sendDiscovery(); - void listenForDiscovery(); + void listen(); void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP); void updateAllNodeStatuses(); void removeDeadNodes(); @@ -26,4 +28,21 @@ public: private: NodeContext& ctx; TaskManager& taskManager; + struct MessageHandler { + bool (*predicate)(const char*); + std::function handle; + const char* name; + }; + void initMessageHandlers(); + void handleIncomingMessage(const char* incoming); + static bool isDiscoveryMsg(const char* msg); + static bool isHeartbeatMsg(const char* msg); + static bool isResponseMsg(const char* msg); + static bool isNodeInfoMsg(const char* msg); + void onDiscovery(const char* msg); + void onHeartbeat(const char* msg); + void onResponse(const char* msg); + void onNodeInfo(const char* msg); + unsigned long lastHeartbeatSentAt = 0; + std::vector messageHandlers; }; diff --git a/include/spore/internal/Globals.h b/include/spore/internal/Globals.h index d7ad5fb..c43d911 100644 --- a/include/spore/internal/Globals.h +++ b/include/spore/internal/Globals.h @@ -7,8 +7,11 @@ namespace ClusterProtocol { constexpr const char* DISCOVERY_MSG = "CLUSTER_DISCOVERY"; constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE"; + constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT"; + constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO"; constexpr uint16_t UDP_PORT = 4210; - constexpr size_t UDP_BUF_SIZE = 64; + // Increased buffer to accommodate node info JSON over UDP + constexpr size_t UDP_BUF_SIZE = 512; constexpr const char* API_NODE_STATUS = "/api/node/status"; } diff --git a/include/spore/types/Config.h b/include/spore/types/Config.h index c9ca59e..fc3bf35 100644 --- a/include/spore/types/Config.h +++ b/include/spore/types/Config.h @@ -15,6 +15,7 @@ public: // Cluster Configuration unsigned long discovery_interval_ms; unsigned long heartbeat_interval_ms; + unsigned long cluster_listen_interval_ms; unsigned long status_update_interval_ms; unsigned long member_info_update_interval_ms; unsigned long print_interval_ms; diff --git a/include/spore/types/NodeInfo.h b/include/spore/types/NodeInfo.h index 66ae6cb..e6c73cb 100644 --- a/include/spore/types/NodeInfo.h +++ b/include/spore/types/NodeInfo.h @@ -17,7 +17,7 @@ struct NodeInfo { uint32_t cpuFreqMHz = 0; uint32_t flashChipSize = 0; } resources; - unsigned long latency = 0; // ms since lastSeen + unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt std::vector endpoints; // List of registered endpoints std::map labels; // Arbitrary node labels (key -> value) }; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 81c1ecc..4746488 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -10,15 +10,16 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx }); // Register tasks registerTasks(); + initMessageHandlers(); } void ClusterManager::registerTasks() { - taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); - taskManager.registerTask("discovery_listen", ctx.config.discovery_interval_ms / 10, [this]() { listenForDiscovery(); }); + taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); + taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); - taskManager.registerTask("update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); }); + taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); }); LOG_INFO("ClusterManager", "Registered all cluster tasks"); } @@ -29,26 +30,146 @@ void ClusterManager::sendDiscovery() { ctx.udp->endPacket(); } -void ClusterManager::listenForDiscovery() { +void ClusterManager::listen() { int packetSize = ctx.udp->parsePacket(); - if (packetSize) { - char incoming[ClusterProtocol::UDP_BUF_SIZE]; - int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); - if (len > 0) { - incoming[len] = 0; + if (!packetSize) { + return; + } + + char incoming[ClusterProtocol::UDP_BUF_SIZE]; + int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); + if (len <= 0) { + return; + } + incoming[len] = 0; + handleIncomingMessage(incoming); +} + +void ClusterManager::initMessageHandlers() { + messageHandlers.clear(); + messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" }); + messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" }); + messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" }); + messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" }); +} + +void ClusterManager::handleIncomingMessage(const char* incoming) { + for (const auto& h : messageHandlers) { + if (h.predicate(incoming)) { + h.handle(incoming); + return; } - //LOG_DEBUG(ctx, "UDP", "Packet received: " + String(incoming)); - if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) { - //LOG_DEBUG(ctx, "UDP", "Discovery request from: " + ctx.udp->remoteIP().toString()); - ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); - String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; - ctx.udp->write(response.c_str()); - ctx.udp->endPacket(); - //LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname); - } else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) { - char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1; - String nodeHost = String(hostPtr); - addOrUpdateNode(nodeHost, ctx.udp->remoteIP()); + } +} + +bool ClusterManager::isDiscoveryMsg(const char* msg) { + return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0; +} + +bool ClusterManager::isHeartbeatMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0; +} + +bool ClusterManager::isResponseMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0; +} + +bool ClusterManager::isNodeInfoMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0; +} + +void ClusterManager::onDiscovery(const char* /*msg*/) { + ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); + String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; + ctx.udp->write(response.c_str()); + ctx.udp->endPacket(); +} + +void ClusterManager::onHeartbeat(const char* /*msg*/) { + JsonDocument doc; + doc["freeHeap"] = ESP.getFreeHeap(); + doc["chipId"] = ESP.getChipId(); + doc["sdkVersion"] = ESP.getSdkVersion(); + doc["cpuFreqMHz"] = ESP.getCpuFreqMHz(); + doc["flashChipSize"] = ESP.getFlashChipSize(); + + if (ctx.memberList) { + auto it = ctx.memberList->find(ctx.hostname); + if (it != ctx.memberList->end()) { + JsonObject labelsObj = doc["labels"].to(); + for (const auto& kv : it->second.labels) { + labelsObj[kv.first.c_str()] = kv.second; + } + } else if (!ctx.self.labels.empty()) { + JsonObject labelsObj = doc["labels"].to(); + for (const auto& kv : ctx.self.labels) { + labelsObj[kv.first.c_str()] = kv.second; + } + } + } + + String json; + serializeJson(doc, json); + + ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); + String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json; + ctx.udp->write(msg.c_str()); + ctx.udp->endPacket(); +} + +void ClusterManager::onResponse(const char* msg) { + char* hostPtr = const_cast(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1; + String nodeHost = String(hostPtr); + addOrUpdateNode(nodeHost, ctx.udp->remoteIP()); +} + +void ClusterManager::onNodeInfo(const char* msg) { + char* p = const_cast(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1; + char* hostEnd = strchr(p, ':'); + if (hostEnd) { + *hostEnd = '\0'; + const char* hostCStr = p; + const char* jsonCStr = hostEnd + 1; + + String nodeHost = String(hostCStr); + IPAddress senderIP = ctx.udp->remoteIP(); + + addOrUpdateNode(nodeHost, senderIP); + + JsonDocument doc; + DeserializationError err = deserializeJson(doc, jsonCStr); + if (!err) { + auto& memberList = *ctx.memberList; + auto it = memberList.find(nodeHost); + if (it != memberList.end()) { + NodeInfo& node = it->second; + node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap; + node.resources.chipId = doc["chipId"] | node.resources.chipId; + { + const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str(); + node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion; + } + node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz; + node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize; + node.status = NodeInfo::ACTIVE; + unsigned long now = millis(); + node.lastSeen = now; + if (lastHeartbeatSentAt != 0) { + node.latency = now - lastHeartbeatSentAt; + } + + node.labels.clear(); + if (doc["labels"].is()) { + JsonObject labelsObj = doc["labels"].as(); + for (JsonPair kvp : labelsObj) { + const char* key = kvp.key().c_str(); + const char* value = labelsObj[kvp.key()]; + node.labels[key] = value; + } + } + } + } else { + LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString()); } } } @@ -71,7 +192,7 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { newNode.hostname = nodeHost; newNode.ip = nodeIP; newNode.lastSeen = millis(); - updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); + updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); memberList[nodeHost] = newNode; LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task @@ -194,31 +315,18 @@ void ClusterManager::heartbeatTaskCallback() { updateLocalNodeResources(); ctx.fire("node_discovered", &node); } + + // Broadcast heartbeat so peers can respond with their node info + lastHeartbeatSentAt = millis(); + ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); + String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname; + ctx.udp->write(hb.c_str()); + ctx.udp->endPacket(); } void ClusterManager::updateAllMembersInfoTaskCallback() { - auto& memberList = *ctx.memberList; - - // Limit concurrent HTTP requests to prevent memory pressure - const size_t maxConcurrentRequests = ctx.config.max_concurrent_http_requests; - size_t requestCount = 0; - - for (auto& pair : memberList) { - const NodeInfo& node = pair.second; - if (node.ip != ctx.localIP) { - // Only process a limited number of requests per cycle - if (requestCount >= maxConcurrentRequests) { - LOG_DEBUG("Cluster", "Limiting concurrent HTTP requests to prevent memory pressure"); - break; - } - - fetchNodeInfo(node.ip); - requestCount++; - - // Add small delay between requests to prevent overwhelming the system - delay(100); - } - } + // HTTP-based member info fetching disabled; node info is provided via UDP responses to heartbeats + // No-op to reduce network and memory usage } void ClusterManager::updateAllNodeStatuses() { diff --git a/src/spore/types/Config.cpp b/src/spore/types/Config.cpp index 84f1bff..f4d9ace 100644 --- a/src/spore/types/Config.cpp +++ b/src/spore/types/Config.cpp @@ -10,10 +10,11 @@ Config::Config() { api_server_port = 80; // Cluster Configuration - discovery_interval_ms = 1000; - heartbeat_interval_ms = 2000; + discovery_interval_ms = 1000; // TODO retire this in favor of heartbeat_interval_ms + cluster_listen_interval_ms = 10; + heartbeat_interval_ms = 5000; status_update_interval_ms = 1000; - member_info_update_interval_ms = 10000; + member_info_update_interval_ms = 10000; // TODO retire this in favor of heartbeat_interval_ms print_interval_ms = 5000; // Node Status Thresholds