Compare commits

..

24 Commits

Author SHA1 Message Date
a9a54ce899 feat: udp stream 2025-10-01 22:28:00 +02:00
99e7ed4809 feat: do not store each infos of each node 2025-10-01 20:45:55 +02:00
dbb3b5bfd3 Merge pull request 'feature/streaming' (#10) from feature/streaming into main
Reviewed-on: #10
2025-09-29 06:29:11 +02:00
2aa887a037 docs(streaming): add 'Things to consider' on high-frequency WS usage and throttling
Notes on heap fragmentation, UDP saturation, throttling/coalescing, buffer reuse, yielding, and payload sizing for reliable streaming/broadcast.
2025-09-28 22:10:18 +02:00
49fe0dabf4 remove node_modules 2025-09-28 22:07:07 +02:00
c0bf16fecb chore(test): remove accidentally committed test/node_modules and ignore it 2025-09-28 21:50:39 +02:00
af46b5c2f4 feat: update logging, add high performance example 2025-09-28 21:48:53 +02:00
7a37901fb2 feat(streaming): suppress WS echo via origin tagging
Inject _origin=ws:<clientId> into JSON payloads on inbound WS messages and strip it on broadcast while skipping the origin client. Documents behavior in StreamingAPI.md.
2025-09-28 21:23:05 +02:00
3cc5405292 feat(streaming): introduce WebSocket Streaming API bridging event bus
ApiServer: add AsyncWebSocket at /ws; accept JSON {event, payload} (string or object) and dispatch via ctx.fire; mirror all local events to clients using NodeContext::onAny.\nNodeContext: add onAny subscriber API.\nNeoPatternService: add api/neopattern/color event to set solid color.\nCluster: centralize cluster/broadcast sending in core; services delegate.\nAPI: add generic /api/node/event and /api/cluster/event endpoints in respective services.\nTests: add ws-color-client, ws-cluster-broadcast-color, http-cluster-broadcast-color.\nDocs: add StreamingAPI.md; update README and test/README.\nFixes: robust WS JSON parsing on ESP8266 and payload handling.
2025-09-28 21:10:26 +02:00
f0df84dc87 feat(api): add generic event endpoints in NodeService and ClusterService
NodeService: add POST /api/node/event to fire local events (params: event, payload).\nClusterService: add POST /api/cluster/event to broadcast events via cluster/broadcast (params: event, payload).\nApiServer: remove inline generic endpoints; services own their registrations.\nUnifies event dispatch through service layer and keeps core server lean.
2025-09-28 17:26:20 +02:00
950142bf7f docs(readme): add Cluster Broadcast section and link to detailed guide
Documents centralized cluster broadcasting flow: service fires local event, core sends UDP CLUSTER_EVENT, peers re-fire locally. Includes usage example and notes; links to docs/ClusterBroadcast.md.
2025-09-28 13:49:48 +02:00
b9b91d71b5 docs: add ClusterBroadcast guide for CLUSTER_EVENT and cluster/broadcast flow
Documents centralized cluster broadcasting: service -> ctx.fire("cluster/broadcast", eventJson) -> UDP CLUSTER_EVENT -> peer ctx.fire(event, data). Includes message format, service/core responsibilities, logging, networking notes, and troubleshooting.
2025-09-28 13:45:10 +02:00
0e6adc999f Merge pull request 'feature/cluster-message' (#9) from feature/cluster-message into main
Reviewed-on: #9
2025-09-28 13:43:53 +02:00
f4ccb1c7ef feat(cluster): centralize cluster/broadcast event for UDP CLUSTER_EVENT send
Register core handler ctx.on("cluster/broadcast") in ClusterManager to send CLUSTER_EVENT via subnet-directed UDP broadcast; services now delegate broadcasting by firing this event. Fix lambda to reference this->ctx inside handler. Update NeoPatternService to fire cluster/broadcast with event JSON instead of sending UDP directly. Improves consistency and removes duplicated UDP code in services.
2025-09-28 13:35:42 +02:00
8da9f77441 refactor(neopattern): unify control handling via event-based single path
Build control payload once from request params and apply locally through ctx.fire("api/neopattern"). If broadcast=true, send the same payload as a CLUSTER_EVENT over UDP.\n\nChanges:\n- Remove duplicated per-parameter setters in REST handler\n- Always fire local event to update state consistently\n- Broadcast reuses same payload; logs retained\n\nImpact:\n- Less duplication, clearer flow, identical behavior for local/remote updates, and simpler maintenance.
2025-09-28 13:27:02 +02:00
cabf857bbd feat(cluster, neopattern): add CLUSTER_EVENT and broadcast handling
Add CLUSTER_EVENT message type and end-to-end handling across cluster and NeoPattern example.\n\nCluster protocol / core:\n- Add ClusterProtocol::CLUSTER_EVENT_MSG\n- ClusterManager: register predicate/handler for CLUSTER_EVENT\n- Robust parsing: accept data as string or nested JSON; serialize nested data to string before firing\n- Add defensive null-termination for full UDP reads; log unknown message head if no handler matches\n- Logging: source IP, payload length, missing fields, and pre-fire event details\n\nNeoPatternService:\n- Constructor now accepts NodeContext and registers ctx.on(api/neopattern) handler\n- /api/neopattern: add optional boolean 'broadcast' flag\n- If broadcast=true: build event payload and send CLUSTER_EVENT over UDP (subnet-directed broadcast); log target and payload size; also fire locally so sender applies immediately\n- Implement applyControlParams with robust ArduinoJson is<T>() checks (replaces deprecated containsKey()) and flexible string/number parsing for color, brightness, steps, interval\n- Minor fixes: include Globals for ClusterProtocol, include ESP8266WiFi for broadcast IP calc, safer brightness clamping\n\nExample:\n- Update neopattern main to pass NodeContext into NeoPatternService\n\nResult:\n- Nodes receive and process CLUSTER_EVENT (api/neopattern) via ctx.fire/ctx.on\n- Broadcast reliably reaches peers; parsing handles both stringified and nested JSON data\n- Additional logs aid diagnosis of delivery/format issues and remove deprecation warnings
2025-09-28 13:18:25 +02:00
2bb0742850 Merge pull request 'feature/node-info-sync' (#8) from feature/node-info-sync into main
Reviewed-on: #8
2025-09-28 12:26:38 +02:00
69bc3fc829 docs: update 2025-09-25 22:47:28 +02:00
eaeb9bbea8 config: more frequent cluster_listen 2025-09-25 22:07:22 +02:00
096cf12704 feat: measure latency 2025-09-25 21:54:25 +02:00
356ec3d381 feat: simplify udp listen 2025-09-25 20:44:31 +02:00
51bd7bd909 feat: introduce udp state machine 2025-09-24 21:23:00 +02:00
921eec3848 docs: update 2025-09-24 21:12:22 +02:00
921e2c7152 feat(cluster): move member info sync to UDP heartbeat; remove HTTP polling
Broadcast CLUSTER_HEARTBEAT every 5s; peers reply with CLUSTER_NODE_INFO containing resources and labels. Update memberList from received node info and set status/lastSeen; keep UDP discovery responses.

Disable HTTP-based updateAllMembersInfoTaskCallback loop to reduce network and memory overhead.

Add protocol constants HEARTBEAT_MSG and NODE_INFO_MSG; increase UDP buffer to 512 bytes.

Set default heartbeat_interval_ms to 5000 ms.

Fix sdkVersion JSON fallback by converting to const char* before assigning to String.
2025-09-23 22:11:49 +02:00
51 changed files with 2138 additions and 1111 deletions

View File

@@ -9,6 +9,8 @@ SPORE is a cluster engine for ESP8266 microcontrollers that provides automatic n
- [Features](#features)
- [Supported Hardware](#supported-hardware)
- [Architecture](#architecture)
- [Cluster Broadcast](#cluster-broadcast)
- [Streaming API](#streaming-api)
- [API Reference](#api-reference)
- [Configuration](#configuration)
- [Development](#development)
@@ -26,6 +28,8 @@ SPORE is a cluster engine for ESP8266 microcontrollers that provides automatic n
- **Service Registry**: Dynamic API endpoint discovery and registration
- **Health Monitoring**: Real-time node status tracking with resource monitoring
- **Event System**: Local and cluster-wide event publishing/subscription
- **Cluster Broadcast**: Centralized UDP broadcast of events (CLUSTER_EVENT)
- **Streaming API**: WebSocket bridge for real-time event send/receive
- **Over-The-Air Updates**: Seamless firmware updates across the cluster
- **REST API**: HTTP-based cluster management and monitoring
- **Capability Discovery**: Automatic API endpoint and service capability detection
@@ -103,6 +107,52 @@ void setup() {
**Examples:** See [`examples/base/`](./examples/base/) for basic usage and [`examples/relay/`](./examples/relay/) for custom service integration.
## Cluster Broadcast
Broadcast an event to all peers using the centralized core broadcaster. Services never touch UDP directly; instead they fire a local event that the core transmits as a `CLUSTER_EVENT`.
Usage:
```cpp
// 1) Apply locally via the same event your service already handles
JsonDocument payload;
payload["pattern"] = "rainbow_cycle";
payload["brightness"] = 100;
String payloadStr; serializeJson(payload, payloadStr);
ctx.fire("api/neopattern", &payloadStr);
// 2) Broadcast to peers via the core
JsonDocument envelope;
envelope["event"] = "api/neopattern";
envelope["data"] = payloadStr; // JSON string
String eventJson; serializeJson(envelope, eventJson);
ctx.fire("cluster/broadcast", &eventJson);
```
Notes:
- The core sends subnet-directed broadcasts (e.g., 192.168.1.255) for reliability.
- Peers receive `CLUSTER_EVENT` and forward to local subscribers with `ctx.fire(event, data)`.
- `data` can be a JSON string or nested JSON; receivers handle both.
📖 See the dedicated guide: [`docs/ClusterBroadcast.md`](./docs/ClusterBroadcast.md)
## Streaming API
Real-time event bridge available at `/ws` using WebSocket.
- Send JSON `{ event, payload }` to dispatch events via `ctx.fire`.
- Receive all local events as `{ event, payload }`.
Examples:
```json
{ "event": "api/neopattern/color", "payload": { "color": "#FF0000", "brightness": 128 } }
```
```json
{ "event": "cluster/broadcast", "payload": { "event": "api/neopattern/color", "data": { "color": "#00FF00" } } }
```
📖 See the dedicated guide: [`docs/StreamingAPI.md`](./docs/StreamingAPI.md)
## API Reference
The system provides a comprehensive RESTful API for monitoring and controlling the embedded device. All endpoints return JSON responses and support standard HTTP status codes.

View File

@@ -15,12 +15,18 @@ The SPORE system provides a comprehensive RESTful API for monitoring and control
| Endpoint | Method | Description | Response |
|----------|--------|-------------|----------|
| `/api/node/status` | GET | System resource information and API endpoint registry | System metrics and API catalog |
| `/api/node/status` | GET | System resource information | System metrics |
| `/api/node/endpoints` | GET | API endpoints and parameters | Detailed endpoint specifications |
| `/api/cluster/members` | GET | Cluster membership and node health information | Cluster topology and health status |
| `/api/node/update` | POST | Handle firmware updates via OTA | Update progress and status |
| `/api/node/restart` | POST | Trigger system restart | Restart confirmation |
### Monitoring API
| Endpoint | Method | Description | Response |
|----------|--------|-------------|----------|
| `/api/monitoring/resources` | GET | CPU, memory, filesystem, and uptime | System resource metrics |
### Network Management API
| Endpoint | Method | Description | Response |
@@ -140,7 +146,7 @@ Controls the execution state of individual tasks. Supports enabling, disabling,
#### GET /api/node/status
Returns comprehensive system resource information including memory usage, chip details, and a registry of all available API endpoints.
Returns comprehensive system resource information including memory usage and chip details. For a list of available API endpoints, use `/api/node/endpoints`.
**Response Fields:**
- `freeHeap`: Available RAM in bytes
@@ -168,7 +174,7 @@ Returns comprehensive system resource information including memory usage, chip d
#### GET /api/node/endpoints
Returns detailed information about all available API endpoints, including their parameters, types, and validation rules.
Returns detailed information about all available API endpoints, including their parameters, types, and validation rules. Methods are returned as strings (e.g., "GET", "POST").
**Response Fields:**
- `endpoints[]`: Array of endpoint capability objects
@@ -236,6 +242,54 @@ Initiates an over-the-air firmware update. The firmware file should be uploaded
Triggers a system restart. The response will be sent before the restart occurs.
### Monitoring
#### GET /api/monitoring/resources
Returns real-time system resource metrics.
Response Fields:
- `cpu.current_usage`: Current CPU usage percent
- `cpu.average_usage`: Average CPU usage percent
- `cpu.max_usage`: Max observed CPU usage
- `cpu.min_usage`: Min observed CPU usage
- `cpu.measurement_count`: Number of measurements
- `cpu.is_measuring`: Whether measurement is active
- `memory.free_heap`: Free heap bytes
- `memory.total_heap`: Total heap bytes (approximate)
- `memory.heap_fragmentation`: Fragmentation percent (0 on ESP8266)
- `filesystem.total_bytes`: LittleFS total bytes
- `filesystem.used_bytes`: Used bytes
- `filesystem.free_bytes`: Free bytes
- `filesystem.usage_percent`: Usage percent
- `system.uptime_ms`: Uptime in milliseconds
Example Response:
```json
{
"cpu": {
"current_usage": 3.5,
"average_usage": 2.1,
"max_usage": 15.2,
"min_usage": 0.0,
"measurement_count": 120,
"is_measuring": true
},
"memory": {
"free_heap": 48748,
"total_heap": 81920,
"heap_fragmentation": 0
},
"filesystem": {
"total_bytes": 65536,
"used_bytes": 10240,
"free_bytes": 55296,
"usage_percent": 15.6
},
"system": {
"uptime_ms": 123456
}
}
```
### Network Management
#### GET /api/network/status

View File

@@ -25,9 +25,9 @@ The system architecture consists of several key components working together:
- **Service Registry**: Track available services across the cluster
### Task Scheduler
- **Cooperative Multitasking**: Background task management system
- **Task Lifecycle Management**: Automatic task execution and monitoring
- **Resource Optimization**: Efficient task scheduling and execution
- **Cooperative Multitasking**: Background task management system (`TaskManager`)
- **Task Lifecycle Management**: Enable/disable tasks and set intervals at runtime
- **Execution Model**: Tasks run in `Spore::loop()` when their interval elapses
### Node Context
- **Central Context**: Shared resources and configuration
@@ -40,27 +40,75 @@ The cluster uses a UDP-based discovery protocol for automatic node detection:
### Discovery Process
1. **Discovery Broadcast**: Nodes periodically send UDP packets on port 4210
2. **Response Handling**: Nodes respond with their hostname and IP address
3. **Member Management**: Discovered nodes are automatically added to the cluster
4. **Health Monitoring**: Continuous status checking via HTTP API calls
1. **Discovery Broadcast**: Nodes periodically send UDP packets on port `udp_port` (default 4210)
2. **Response Handling**: Nodes respond with `CLUSTER_RESPONSE:<hostname>`
3. **Member Management**: Discovered nodes are added/updated in the cluster
4. **Node Info via UDP**: Heartbeat triggers peers to send `CLUSTER_NODE_INFO:<hostname>:<json>`
### Protocol Details
- **UDP Port**: 4210 (configurable)
- **UDP Port**: 4210 (configurable via `Config.udp_port`)
- **Discovery Message**: `CLUSTER_DISCOVERY`
- **Response Message**: `CLUSTER_RESPONSE`
- **Heartbeat Message**: `CLUSTER_HEARTBEAT`
- **Node Info Message**: `CLUSTER_NODE_INFO:<hostname>:<json>`
- **Broadcast Address**: 255.255.255.255
- **Discovery Interval**: 1 second (configurable)
- **Listen Interval**: 100ms (configurable)
- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms)
- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms)
- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms)
### Message Formats
- **Discovery**: `CLUSTER_DISCOVERY`
- Sender: any node, broadcast to 255.255.255.255:`udp_port`
- Purpose: announce presence and solicit peer identification
- **Response**: `CLUSTER_RESPONSE:<hostname>`
- Sender: node receiving a discovery; unicast to requester IP
- Purpose: provide hostname so requester can register/update member
- **Heartbeat**: `CLUSTER_HEARTBEAT:<hostname>`
- Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval
- Purpose: prompt peers to reply with their node info and keep liveness
- **Node Info**: `CLUSTER_NODE_INFO:<hostname>:<json>`
- Sender: node receiving a heartbeat; unicast to heartbeat sender IP
- JSON fields: freeHeap, chipId, sdkVersion, cpuFreqMHz, flashChipSize, optional labels
### Discovery Flow
1. **Sender broadcasts** `CLUSTER_DISCOVERY`
2. **Each receiver responds** with `CLUSTER_RESPONSE:<hostname>` to the sender IP
3. **Sender registers/updates** the node using hostname and source IP
### Heartbeat Flow
1. **A node broadcasts** `CLUSTER_HEARTBEAT:<hostname>`
2. **Each receiver replies** with `CLUSTER_NODE_INFO:<hostname>:<json>` to the heartbeat sender IP
3. **The sender**:
- Ensures the node exists or creates it with `hostname` and sender IP
- Parses JSON and updates resources, labels, `status = ACTIVE`, `lastSeen = now`
- Sets `latency = now - lastHeartbeatSentAt` (per-node, measured at heartbeat origin)
### Listener Behavior
The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to:
- **Discovery** → send `CLUSTER_RESPONSE`
- **Heartbeat** → send `CLUSTER_NODE_INFO` JSON
- **Response** → add/update node using provided hostname and source IP
- **Node Info** → update resources/status/labels and record latency
### Timing and Intervals
- **UDP Port**: `Config.udp_port` (default 4210)
- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms)
- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms)
- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms)
### Node Status Categories
Nodes are automatically categorized by their activity:
- **ACTIVE**: Responding within 10 seconds
- **INACTIVE**: No response for 10-60 seconds
- **DEAD**: No response for over 60 seconds
- **ACTIVE**: lastSeen < `node_inactive_threshold_ms` (default 10s)
- **INACTIVE**: < `node_dead_threshold_ms` (default 120s)
- **DEAD**: `node_dead_threshold_ms`
## Task Scheduling System
@@ -68,14 +116,14 @@ The system runs several background tasks at different intervals:
### Core System Tasks
| Task | Interval | Purpose |
|------|----------|---------|
| **Discovery Send** | 1 second | Send UDP discovery packets |
| **Discovery Listen** | 100ms | Listen for discovery responses |
| **Status Updates** | 1 second | Monitor cluster member health |
| **Heartbeat** | 2 seconds | Maintain cluster connectivity |
| **Member Info** | 10 seconds | Update detailed node information |
| **Debug Output** | 5 seconds | Print cluster status |
| Task | Interval (default) | Purpose |
|------|--------------------|---------|
| `cluster_discovery` | 1000 ms | Send UDP discovery packets |
| `cluster_listen` | 10 ms | Listen for discovery/heartbeat/node-info |
| `status_update` | 1000 ms | Update node status categories, purge dead |
| `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources |
| `cluster_update_members_info` | 10000 ms | Reserved; no-op (info via UDP) |
| `print_members` | 5000 ms | Log current member list |
### Task Management Features
@@ -112,10 +160,7 @@ ctx.fire("cluster_updated", &clusterData);
### Available Events
- **`node_discovered`**: New node added to cluster
- **`cluster_updated`**: Cluster membership changed
- **`resource_update`**: Node resources updated
- **`health_check`**: Node health status changed
- **`node_discovered`**: New node added or local node refreshed
## Resource Monitoring
@@ -155,10 +200,8 @@ The system includes automatic WiFi fallback for robust operation:
### Configuration
- **SSID Format**: `SPORE_<MAC_LAST_4>`
- **Password**: Configurable fallback password
- **IP Range**: 192.168.4.x subnet
- **Gateway**: 192.168.4.1
- **Hostname**: Derived from MAC (`esp-<mac>`) and assigned to `ctx.hostname`
- **AP Mode**: If STA connection fails, device switches to AP mode with configured SSID/password
## Cluster Topology
@@ -170,32 +213,30 @@ The system includes automatic WiFi fallback for robust operation:
### Network Architecture
- **Mesh-like Structure**: Nodes can communicate with each other
- **Dynamic Routing**: Automatic path discovery between nodes
- **Load Distribution**: Tasks distributed across available nodes
- **Fault Tolerance**: Automatic failover and recovery
- UDP broadcast-based discovery and heartbeats on local subnet
- Optional HTTP polling (disabled by default; node info exchanged via UDP)
## Data Flow
### Node Discovery
1. **UDP Broadcast**: Nodes broadcast discovery packets on port 4210
2. **UDP Response**: Receiving nodes responds with hostname
2. **UDP Response**: Receiving nodes respond with hostname
3. **Registration**: Discovered nodes are added to local cluster member list
### Health Monitoring
1. **Periodic Checks**: Cluster manager polls member nodes every 1 second
2. **Status Collection**: Each node returns resource usage and health metrics
1. **Periodic Checks**: Cluster manager updates node status categories
2. **Status Collection**: Each node updates resources via UDP node-info messages
### Task Management
1. **Scheduling**: TaskScheduler executes registered tasks at configured intervals
2. **Execution**: Tasks run cooperatively, yielding control to other tasks
3. **Monitoring**: Task status and results are exposed via REST API endpoints
1. **Scheduling**: `TaskManager` executes registered tasks at configured intervals
2. **Execution**: Tasks run cooperatively in the main loop without preemption
3. **Monitoring**: Task status is exposed via REST (`/api/tasks/status`)
## Performance Characteristics
### Memory Usage
- **Base System**: ~15-20KB RAM
- **Base System**: ~15-20KB RAM (device dependent)
- **Per Task**: ~100-200 bytes per task
- **Cluster Members**: ~50-100 bytes per member
- **API Endpoints**: ~20-30 bytes per endpoint
@@ -219,7 +260,7 @@ The system includes automatic WiFi fallback for robust operation:
### Current Implementation
- **Network Access**: Local network only (no internet exposure)
- **Authentication**: None currently implemented
- **Authentication**: None currently implemented; LAN-only access assumed
- **Data Validation**: Basic input validation
- **Resource Limits**: Memory and processing constraints

91
docs/ClusterBroadcast.md Normal file
View File

@@ -0,0 +1,91 @@
## Cluster Broadcast (CLUSTER_EVENT)
### Overview
Spore supports cluster-wide event broadcasting via UDP. Services publish a local event, and the core broadcasts it to peers as a `CLUSTER_EVENT`. Peers receive the event and forward it to local subscribers through the internal event bus.
- **Local trigger**: `ctx.fire("cluster/broadcast", eventJson)`
- **UDP message**: `CLUSTER_EVENT:{json}`
- **Receiver action**: Parses `{json}` and calls `ctx.fire(event, data)`
This centralizes network broadcast in core, so services never touch UDP directly.
### Message format
- UDP payload prefix: `CLUSTER_EVENT:`
- JSON body:
```json
{
"event": "<event-name>",
"data": "<json-string>" // or an inline JSON object/array
}
```
Notes:
- The receiver accepts `data` as either a JSON string or a nested JSON object/array. Nested JSON is serialized back to a string before firing the local event.
- Keep payloads small (UDP, default buffer 512 bytes).
### Core responsibilities
- `ClusterManager` registers a centralized handler:
- Subscribes to `cluster/broadcast` to send the provided event JSON over UDP broadcast.
- Listens for incoming UDP `CLUSTER_EVENT` messages and forwards them to local subscribers via `ctx.fire(event, data)`.
- Broadcast target uses subnet-directed broadcast (e.g., `192.168.1.255`) for better reliability. Both nodes must share the same `udp_port`.
### Service responsibilities
Services send and receive events using the local event bus.
1) Subscribe to an event name and apply state from `data`:
```cpp
ctx.on("api/neopattern", [this](void* dataPtr) {
String* jsonStr = static_cast<String*>(dataPtr);
if (!jsonStr) return;
JsonDocument doc;
if (deserializeJson(doc, *jsonStr)) return;
JsonObject obj = doc.as<JsonObject>();
// Parse and apply fields from obj
});
```
2) Build a control payload and update locally via the same event:
```cpp
JsonDocument payload;
payload["pattern"] = "rainbow_cycle"; // example
payload["brightness"] = 100;
String payloadStr; serializeJson(payload, payloadStr);
ctx.fire("api/neopattern", &payloadStr);
``;
3) Broadcast to peers by delegating to core:
```cpp
JsonDocument envelope;
envelope["event"] = "api/neopattern";
envelope["data"] = payloadStr; // JSON string
String eventJson; serializeJson(envelope, eventJson);
ctx.fire("cluster/broadcast", &eventJson);
```
With this flow, services have a single codepath for applying state (the event handler). Broadcasting simply reuses the same payload.
### Logging
- Core logs source IP, payload length, and event name for received `CLUSTER_EVENT`s.
- Services can log when submitting `cluster/broadcast` and when applying control events.
### Networking considerations
- Ensure all nodes:
- Listen on the same `udp_port`.
- Are in the same subnet (for subnet-directed broadcast).
- Some networks may block global broadcast (`255.255.255.255`). Subnet-directed broadcast is used by default.
### Troubleshooting
- If peers do not react:
- Confirm logs show `CLUSTER_EVENT raw from <ip>` on the receiver.
- Verify UDP port alignment and WiFi connection/subnet.
- Check payload size (<512 bytes by default) and JSON validity.
- Ensure the service subscribed to the correct `event` name and handles `data`.

View File

@@ -20,57 +20,99 @@
```
spore/
├── src/ # Source code
── main.cpp # Main application entry point
├── ApiServer.cpp # HTTP API server implementation
├── ClusterManager.cpp # Cluster management logic
├── NetworkManager.cpp # WiFi and network handling
│ ├── TaskManager.cpp # Background task management
└── NodeContext.cpp # Central context and events
├── src/ # Source code (framework under src/spore)
── spore/
├── Spore.cpp # Framework lifecycle (setup/begin/loop)
├── core/ # Core components
│ ├── ApiServer.cpp # HTTP API server implementation
│ ├── ClusterManager.cpp # Cluster management logic
│ ├── NetworkManager.cpp # WiFi and network handling
│ │ ├── TaskManager.cpp # Background task management
│ │ └── NodeContext.cpp # Central context and events
│ ├── services/ # Built-in services
│ │ ├── NodeService.cpp
│ │ ├── NetworkService.cpp
│ │ ├── ClusterService.cpp
│ │ ├── TaskService.cpp
│ │ ├── StaticFileService.cpp
│ │ └── MonitoringService.cpp
│ └── types/ # Shared types
├── include/ # Header files
├── lib/ # Library files
├── examples/ # Example apps per env (base, relay, neopattern)
├── docs/ # Documentation
├── api/ # OpenAPI specification
├── examples/ # Example code
── test/ # Test files
├── platformio.ini # PlatformIO configuration
└── ctl.sh # Build and deployment scripts
├── platformio.ini # PlatformIO configuration
── ctl.sh # Build and deployment scripts
```
## PlatformIO Configuration
### Framework and Board
The project uses PlatformIO with the following configuration:
The project uses PlatformIO with the following configuration (excerpt):
```ini
[env:esp01_1m]
[platformio]
default_envs = base
src_dir = .
data_dir = ${PROJECT_DIR}/examples/${PIOENV}/data
[common]
monitor_speed = 115200
lib_deps =
esp32async/ESPAsyncWebServer@^3.8.0
bblanchon/ArduinoJson@^7.4.2
[env:base]
platform = platformio/espressif8266@^4.2.1
board = esp01_1m
framework = arduino
upload_speed = 115200
flash_mode = dout
monitor_speed = 115200
board_build.f_cpu = 80000000L
board_build.flash_mode = qio
board_build.filesystem = littlefs
; note: somehow partition table is not working, so we need to use the ldscript
board_build.ldscript = eagle.flash.1m64.ld
lib_deps = ${common.lib_deps}
build_src_filter =
+<examples/base/*.cpp>
+<src/spore/*.cpp>
+<src/spore/core/*.cpp>
+<src/spore/services/*.cpp>
+<src/spore/types/*.cpp>
+<src/spore/util/*.cpp>
+<src/internal/*.cpp>
[env:d1_mini]
platform = platformio/espressif8266@^4.2.1
board = d1_mini
framework = arduino
upload_speed = 115200
monitor_speed = 115200
board_build.filesystem = littlefs
board_build.flash_mode = dio ; D1 Mini uses DIO on 4 Mbit flash
board_build.flash_size = 4M
board_build.ldscript = eagle.flash.4m1m.ld
lib_deps = ${common.lib_deps}
build_src_filter =
+<examples/base/*.cpp>
+<src/spore/*.cpp>
+<src/spore/core/*.cpp>
+<src/spore/services/*.cpp>
+<src/spore/types/*.cpp>
+<src/spore/util/*.cpp>
+<src/internal/*.cpp>
```
### Key Configuration Details
- **Framework**: Arduino
- **Board**: ESP-01 with 1MB flash
- **Upload Speed**: 115200 baud
- **Flash Mode**: DOUT (required for ESP-01S)
- **Build Type**: Release (optimized for production)
### Dependencies
The project requires the following libraries:
The project requires the following libraries (resolved via PlatformIO):
```ini
lib_deps =
esp32async/ESPAsyncWebServer@^3.8.0
bblanchon/ArduinoJson@^7.4.2
arkhipenko/TaskScheduler@^3.8.5
ESP8266HTTPClient@1.2
ESP8266WiFi@1.0
```
### Filesystem, Linker Scripts, and Flash Layout
@@ -103,7 +145,6 @@ Notes:
- If you need a different FS size, select an appropriate ldscript variant and keep `board_build.filesystem = littlefs`.
- On ESP8266, custom partition CSVs are not used for layout; the linker script defines the flash map. This project removed prior `board_build.partitions` usage in favor of explicit `board_build.ldscript` entries per environment.
## Building
### Basic Build Commands
@@ -308,7 +349,7 @@ export API_NODE=192.168.1.100
Key configuration files:
- **`platformio.ini`**: Build and upload configuration
- **`src/Config.cpp`**: Application configuration
- **`src/spore/types/Config.cpp`**: Default runtime configuration
- **`.env`**: Environment variables
- **`ctl.sh`**: Build and deployment scripts

79
docs/MonitoringService.md Normal file
View File

@@ -0,0 +1,79 @@
# Monitoring Service
Exposes system resource metrics via HTTP for observability.
## Overview
- **Service name**: `MonitoringService`
- **Endpoint**: `GET /api/monitoring/resources`
- **Metrics**: CPU usage, memory, filesystem, uptime
## Endpoint
### GET /api/monitoring/resources
Returns real-time system resource metrics.
Response fields:
- `cpu.current_usage`: Current CPU usage percent
- `cpu.average_usage`: Average CPU usage percent
- `cpu.max_usage`: Max observed CPU usage
- `cpu.min_usage`: Min observed CPU usage
- `cpu.measurement_count`: Number of measurements
- `cpu.is_measuring`: Whether measurement is active
- `memory.free_heap`: Free heap bytes
- `memory.total_heap`: Total heap bytes (approximate)
- `memory.min_free_heap`: Minimum free heap (0 on ESP8266)
- `memory.max_alloc_heap`: Max allocatable heap (0 on ESP8266)
- `memory.heap_fragmentation`: Fragmentation percent (0 on ESP8266)
- `filesystem.total_bytes`: LittleFS total bytes
- `filesystem.used_bytes`: Used bytes
- `filesystem.free_bytes`: Free bytes
- `filesystem.usage_percent`: Usage percent
- `system.uptime_ms`: Uptime in milliseconds
- `system.uptime_seconds`: Uptime in seconds
- `system.uptime_formatted`: Human-readable uptime
Example:
```json
{
"cpu": {
"current_usage": 3.5,
"average_usage": 2.1,
"max_usage": 15.2,
"min_usage": 0.0,
"measurement_count": 120,
"is_measuring": true
},
"memory": {
"free_heap": 48748,
"total_heap": 81920,
"min_free_heap": 0,
"max_alloc_heap": 0,
"heap_fragmentation": 0,
"heap_usage_percent": 40.4
},
"filesystem": {
"total_bytes": 65536,
"used_bytes": 10240,
"free_bytes": 55296,
"usage_percent": 15.6
},
"system": {
"uptime_ms": 123456,
"uptime_seconds": 123,
"uptime_formatted": "0h 2m 3s"
}
}
```
## Implementation Notes
- `MonitoringService` reads from `CpuUsage` and ESP8266 SDK APIs.
- Filesystem metrics are gathered from LittleFS.
- CPU measurement is bracketed by `Spore::loop()` calling `cpuUsage.startMeasurement()` and `cpuUsage.endMeasurement()`.
## Troubleshooting
- If `filesystem.total_bytes` is zero, ensure LittleFS is enabled in `platformio.ini` and an FS image is uploaded.
- CPU usage values remain zero until the main loop runs and CPU measurement is started.

View File

@@ -15,15 +15,8 @@ Complete API reference with detailed endpoint documentation, examples, and integ
- Task management workflows
- Cluster monitoring examples
### 📖 [TaskManager.md](./TaskManager.md)
Comprehensive guide to the TaskManager system for background task management.
**Includes:**
- Basic usage examples
- Advanced binding techniques
- Task status monitoring
- API integration details
- Performance considerations
### 📖 [MonitoringService.md](./MonitoringService.md)
System resource monitoring API for CPU, memory, filesystem, and uptime.
### 📖 [TaskManagement.md](./TaskManagement.md)
Complete guide to the task management system with examples and best practices.

98
docs/StreamingAPI.md Normal file
View File

@@ -0,0 +1,98 @@
## Streaming API (WebSocket)
### Overview
The streaming API exposes an event-driven WebSocket at `/ws`. It bridges between external clients and the internal event bus:
- Incoming WebSocket JSON `{ event, payload }``ctx.fire(event, payload)`
- Local events → broadcasted to all connected WebSocket clients as `{ event, payload }`
This allows real-time control and observation of the system without polling.
### URL
- `ws://<device-ip>/ws`
### Message Format
- Client → Device
```json
{
"event": "<event-name>",
"payload": "<json-string>" | { /* inline JSON */ }
}
```
- Device → Client
```json
{
"event": "<event-name>",
"payload": "<json-string>"
}
```
Notes:
- The device accepts `payload` as a string or a JSON object/array. Objects are serialized into a string before dispatching to local subscribers to keep a consistent downstream contract.
- A minimal ack `{ "ok": true }` is sent after a valid inbound message.
#### Echo suppression (origin tagging)
- To prevent the sender from receiving an immediate echo of its own message, the server injects a private field into JSON payloads:
- `_origin: "ws:<clientId>"`
- When re-broadcasting local events to WebSocket clients, the server:
- Strips the `_origin` field from the outgoing payload
- Skips the originating `clientId` so only other clients receive the message
- If a payload is not valid JSON (plain string), no origin tag is injected and the message may be echoed
### Event Bus Integration
- The WebSocket registers an `onAny` subscriber to `NodeContext` so that all local events are mirrored to clients.
- Services should subscribe to specific events via `ctx.on("<name>", ...)`.
### Examples
1) Set a solid color on NeoPattern:
```json
{
"event": "api/neopattern/color",
"payload": { "color": "#FF0000", "brightness": 128 }
}
```
2) Broadcast a cluster event (delegated to core):
```json
{
"event": "cluster/broadcast",
"payload": {
"event": "api/neopattern/color",
"data": { "color": "#00FF00", "brightness": 128 }
}
}
```
### Reference Implementation
- WebSocket setup and bridging are implemented in `ApiServer`.
- Global event subscription uses `NodeContext::onAny`.
Related docs:
- [`ClusterBroadcast.md`](./ClusterBroadcast.md) — centralized UDP broadcasting and CLUSTER_EVENT format
### Things to consider
- High-frequency updates can overwhelm ESP8266:
- Frequent JSON parse/serialize and `String` allocations fragment heap and may cause resets (e.g., Exception(3)).
- UDP broadcast on every message amplifies load; WiFi/UDP buffers can back up.
- Prefer ≥50100 ms intervals; microbursts at 10 ms are risky.
- Throttle and coalesce:
- Add a minimum interval in the core `cluster/broadcast` handler.
- Optionally drop redundant updates (e.g., same color as previous).
- Reduce allocations:
- Reuse `StaticJsonDocument`/preallocated buffers in hot paths.
- Avoid re-serializing when possible; pass-through payload strings.
- Reserve `String` capacity when reuse is needed.
- Yielding:
- Call `yield()` in long-running or bursty paths to avoid WDT.
- Packet size:
- Keep payloads small to fit `ClusterProtocol::UDP_BUF_SIZE` and reduce airtime.

View File

@@ -319,18 +319,18 @@ curl -X POST http://192.168.1.100/api/tasks/control \
### Before (with wrapper functions):
```cpp
void discoverySendTask() { cluster.sendDiscovery(); }
void discoveryListenTask() { cluster.listenForDiscovery(); }
void clusterListenTask() { cluster.listen(); }
taskManager.registerTask("discovery_send", interval, discoverySendTask);
taskManager.registerTask("discovery_listen", interval, discoveryListenTask);
taskManager.registerTask("cluster_listen", interval, clusterListenTask);
```
### After (with std::bind):
```cpp
taskManager.registerTask("discovery_send", interval,
std::bind(&ClusterManager::sendDiscovery, &cluster));
taskManager.registerTask("discovery_listen", interval,
std::bind(&ClusterManager::listenForDiscovery, &cluster));
taskManager.registerTask("cluster_listen", interval,
std::bind(&ClusterManager::listen, &cluster));
```
## Compatibility

View File

@@ -1,10 +1,13 @@
#include "NeoPatternService.h"
#include "spore/core/ApiServer.h"
#include "spore/util/Logging.h"
#include "spore/internal/Globals.h"
#include <ArduinoJson.h>
#include <ESP8266WiFi.h>
NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config)
NeoPatternService::NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config)
: taskManager(taskMgr),
ctx(ctx),
config(config),
activePattern(NeoPatternType::RAINBOW_CYCLE),
direction(NeoDirection::FORWARD),
@@ -32,6 +35,7 @@ NeoPatternService::NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig&
registerPatterns();
registerTasks();
registerEventHandlers();
initialized = true;
LOG_INFO("NeoPattern", "Service initialized");
@@ -64,7 +68,8 @@ void NeoPatternService::registerEndpoints(ApiServer& api) {
ParamSpec{String("brightness"), false, String("body"), String("numberRange"), {}, String("80")},
ParamSpec{String("total_steps"), false, String("body"), String("numberRange"), {}, String("16")},
ParamSpec{String("direction"), false, String("body"), String("string"), {String("forward"), String("reverse")}},
ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")}
ParamSpec{String("interval"), false, String("body"), String("number"), {}, String("100")},
ParamSpec{String("broadcast"), false, String("body"), String("boolean"), {}}
});
// State endpoint for complex state updates
@@ -119,61 +124,49 @@ void NeoPatternService::handlePatternsRequest(AsyncWebServerRequest* request) {
void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) {
bool updated = false;
bool broadcast = false;
if (request->hasParam("pattern", true)) {
String name = request->getParam("pattern", true)->value();
if (isValidPattern(name)) {
setPatternByName(name);
updated = true;
} else {
// Invalid pattern name - could add error handling here
LOG_WARN("NeoPattern", "Invalid pattern name: " + name);
}
if (request->hasParam("broadcast", true)) {
String b = request->getParam("broadcast", true)->value();
broadcast = b.equalsIgnoreCase("true") || b == "1";
}
if (request->hasParam("color", true)) {
String colorStr = request->getParam("color", true)->value();
uint32_t color = parseColor(colorStr);
setColor(color);
// Build JSON payload from provided params (single source of truth)
JsonDocument payload;
bool any = false;
if (request->hasParam("pattern", true)) { payload["pattern"] = request->getParam("pattern", true)->value(); any = true; }
if (request->hasParam("color", true)) { payload["color"] = request->getParam("color", true)->value(); any = true; }
if (request->hasParam("color2", true)) { payload["color2"] = request->getParam("color2", true)->value(); any = true; }
if (request->hasParam("brightness", true)) { payload["brightness"] = request->getParam("brightness", true)->value(); any = true; }
if (request->hasParam("total_steps", true)) { payload["total_steps"] = request->getParam("total_steps", true)->value(); any = true; }
if (request->hasParam("direction", true)) { payload["direction"] = request->getParam("direction", true)->value(); any = true; }
if (request->hasParam("interval", true)) { payload["interval"] = request->getParam("interval", true)->value(); any = true; }
String payloadStr;
serializeJson(payload, payloadStr);
// Always apply locally via event so we have a single codepath for updates
if (any) {
std::string ev = "api/neopattern";
String localData = payloadStr;
LOG_INFO("NeoPattern", String("Applying local api/neopattern via event payloadLen=") + String(payloadStr.length()));
ctx.fire(ev, &localData);
updated = true;
}
if (request->hasParam("color2", true)) {
String colorStr = request->getParam("color2", true)->value();
uint32_t color = parseColor(colorStr);
setColor2(color);
updated = true;
}
// Broadcast to peers if requested (delegate to core broadcast handler)
if (broadcast && any) {
JsonDocument eventDoc;
eventDoc["event"] = "api/neopattern";
eventDoc["data"] = payloadStr; // data is JSON string
if (request->hasParam("brightness", true)) {
int b = request->getParam("brightness", true)->value().toInt();
if (b < 0) b = 0;
if (b > 255) b = 255;
setBrightness(static_cast<uint8_t>(b));
updated = true;
}
String eventJson;
serializeJson(eventDoc, eventJson);
if (request->hasParam("total_steps", true)) {
int steps = request->getParam("total_steps", true)->value().toInt();
if (steps > 0) {
setTotalSteps(static_cast<uint16_t>(steps));
updated = true;
}
}
if (request->hasParam("direction", true)) {
String dirStr = request->getParam("direction", true)->value();
NeoDirection dir = (dirStr.equalsIgnoreCase("reverse")) ? NeoDirection::REVERSE : NeoDirection::FORWARD;
setDirection(dir);
updated = true;
}
if (request->hasParam("interval", true)) {
unsigned long interval = request->getParam("interval", true)->value().toInt();
if (interval > 0) {
setUpdateInterval(interval);
updated = true;
}
LOG_INFO("NeoPattern", String("Submitting cluster/broadcast for api/neopattern payloadLen=") + String(payloadStr.length()));
std::string ev = "cluster/broadcast";
String eventStr = eventJson;
ctx.fire(ev, &eventStr);
}
// Return current state
@@ -192,6 +185,139 @@ void NeoPatternService::handleControlRequest(AsyncWebServerRequest* request) {
serializeJson(resp, json);
request->send(200, "application/json", json);
}
void NeoPatternService::registerEventHandlers() {
ctx.on("api/neopattern", [this](void* dataPtr) {
String* jsonStr = static_cast<String*>(dataPtr);
if (!jsonStr) {
LOG_WARN("NeoPattern", "Received api/neopattern with null dataPtr");
return;
}
LOG_INFO("NeoPattern", String("Received api/neopattern event dataLen=") + String(jsonStr->length()));
JsonDocument doc;
DeserializationError err = deserializeJson(doc, *jsonStr);
if (err) {
LOG_WARN("NeoPattern", String("Failed to parse CLUSTER_EVENT data: ") + err.c_str());
return;
}
JsonObject obj = doc.as<JsonObject>();
bool applied = applyControlParams(obj);
if (applied) {
LOG_INFO("NeoPattern", "Applied control from CLUSTER_EVENT");
}
});
// Solid color event: sets all pixels to the same color
ctx.on("api/neopattern/color", [this](void* dataPtr) {
String* jsonStr = static_cast<String*>(dataPtr);
if (!jsonStr) {
LOG_WARN("NeoPattern", "Received api/neopattern/color with null dataPtr");
return;
}
JsonDocument doc;
DeserializationError err = deserializeJson(doc, *jsonStr);
if (err) {
LOG_WARN("NeoPattern", String("Failed to parse color event data: ") + err.c_str());
return;
}
JsonObject obj = doc.as<JsonObject>();
// color can be string or number
String colorStr;
if (obj["color"].is<const char*>() || obj["color"].is<String>()) {
colorStr = obj["color"].as<String>();
} else if (obj["color"].is<long>() || obj["color"].is<int>()) {
colorStr = String(obj["color"].as<long>());
} else {
LOG_WARN("NeoPattern", "api/neopattern/color missing 'color'");
return;
}
// Optional brightness
if (obj["brightness"].is<int>() || obj["brightness"].is<long>()) {
int b = obj["brightness"].as<int>();
if (b < 0) b = 0; if (b > 255) b = 255;
setBrightness(static_cast<uint8_t>(b));
}
uint32_t color = parseColor(colorStr);
setPattern(NeoPatternType::NONE);
setColor(color);
LOG_INFO("NeoPattern", String("Set solid color ") + colorStr);
});
}
bool NeoPatternService::applyControlParams(const JsonObject& obj) {
bool updated = false;
if (obj["pattern"].is<const char*>() || obj["pattern"].is<String>()) {
String name = obj["pattern"].as<String>();
if (isValidPattern(name)) {
setPatternByName(name);
updated = true;
}
}
if (obj["color"].is<const char*>() || obj["color"].is<String>() || obj["color"].is<long>() || obj["color"].is<int>()) {
String colorStr;
if (obj["color"].is<long>() || obj["color"].is<int>()) {
colorStr = String(obj["color"].as<long>());
} else {
colorStr = obj["color"].as<String>();
}
uint32_t color = parseColor(colorStr);
setColor(color);
updated = true;
}
if (obj["color2"].is<const char*>() || obj["color2"].is<String>() || obj["color2"].is<long>() || obj["color2"].is<int>()) {
String colorStr;
if (obj["color2"].is<long>() || obj["color2"].is<int>()) {
colorStr = String(obj["color2"].as<long>());
} else {
colorStr = obj["color2"].as<String>();
}
uint32_t color = parseColor(colorStr);
setColor2(color);
updated = true;
}
if (obj["brightness"].is<int>() || obj["brightness"].is<long>() || obj["brightness"].is<const char*>() || obj["brightness"].is<String>()) {
int b = 0;
if (obj["brightness"].is<int>() || obj["brightness"].is<long>()) {
b = obj["brightness"].as<int>();
} else {
b = String(obj["brightness"].as<String>()).toInt();
}
if (b < 0) {
b = 0;
}
if (b > 255) {
b = 255;
}
setBrightness(static_cast<uint8_t>(b));
updated = true;
}
if (obj["total_steps"].is<int>() || obj["total_steps"].is<long>() || obj["total_steps"].is<const char*>() || obj["total_steps"].is<String>()) {
int steps = 0;
if (obj["total_steps"].is<int>() || obj["total_steps"].is<long>()) {
steps = obj["total_steps"].as<int>();
} else {
steps = String(obj["total_steps"].as<String>()).toInt();
}
if (steps > 0) { setTotalSteps(static_cast<uint16_t>(steps)); updated = true; }
}
if (obj["direction"].is<const char*>() || obj["direction"].is<String>()) {
String dirStr = obj["direction"].as<String>();
NeoDirection dir = (dirStr.equalsIgnoreCase("reverse")) ? NeoDirection::REVERSE : NeoDirection::FORWARD;
setDirection(dir);
updated = true;
}
if (obj["interval"].is<int>() || obj["interval"].is<long>() || obj["interval"].is<const char*>() || obj["interval"].is<String>()) {
unsigned long interval = 0;
if (obj["interval"].is<int>() || obj["interval"].is<long>()) {
interval = obj["interval"].as<unsigned long>();
} else {
interval = String(obj["interval"].as<String>()).toInt();
}
if (interval > 0) { setUpdateInterval(interval); updated = true; }
}
return updated;
}
void NeoPatternService::handleStateRequest(AsyncWebServerRequest* request) {
if (request->contentType() != "application/json") {

View File

@@ -1,6 +1,7 @@
#pragma once
#include "spore/Service.h"
#include "spore/core/TaskManager.h"
#include "spore/core/NodeContext.h"
#include "NeoPattern.h"
#include "NeoPatternState.h"
#include "NeoPixelConfig.h"
@@ -25,7 +26,7 @@ public:
REVERSE
};
NeoPatternService(TaskManager& taskMgr, const NeoPixelConfig& config);
NeoPatternService(NodeContext& ctx, TaskManager& taskMgr, const NeoPixelConfig& config);
~NeoPatternService();
void registerEndpoints(ApiServer& api) override;
@@ -49,6 +50,8 @@ private:
void registerTasks();
void registerPatterns();
void update();
void registerEventHandlers();
bool applyControlParams(const JsonObject& obj);
// Pattern updaters
void updateRainbowCycle();
@@ -80,6 +83,7 @@ private:
String getPatternDescription(const String& name) const;
TaskManager& taskManager;
NodeContext& ctx;
NeoPattern* neoPattern;
NeoPixelConfig config;
NeoPatternState currentState;

View File

@@ -10,7 +10,7 @@
#endif
#ifndef NEOPIXEL_LENGTH
#define NEOPIXEL_LENGTH 8
#define NEOPIXEL_LENGTH 16
#endif
#ifndef NEOPIXEL_BRIGHTNESS
@@ -45,7 +45,7 @@ void setup() {
);
// Create and add custom service
neoPatternService = new NeoPatternService(spore.getTaskManager(), config);
neoPatternService = new NeoPatternService(spore.getContext(), spore.getTaskManager(), config);
spore.addService(neoPatternService);
// Start the API server and complete initialization

View File

@@ -0,0 +1,127 @@
#include "PixelStreamController.h"
namespace {
constexpr int COMPONENTS_PER_PIXEL = 3;
}
PixelStreamController::PixelStreamController(NodeContext& ctxRef, const PixelStreamConfig& cfg)
: ctx(ctxRef), config(cfg), pixels(cfg.pixelCount, cfg.pin, cfg.pixelType) {
}
void PixelStreamController::begin() {
pixels.begin();
pixels.setBrightness(config.brightness);
// Default all pixels to green so we can verify hardware before streaming frames
for (uint16_t i = 0; i < config.pixelCount; ++i) {
pixels.setPixelColor(i, pixels.Color(0, 255, 0));
}
pixels.show();
ctx.on("udp/raw", [this](void* data) {
this->handleEvent(data);
});
LOG_INFO("PixelStream", String("PixelStreamController ready on pin ") + String(config.pin) + " with " + String(config.pixelCount) + " pixels");
}
void PixelStreamController::handleEvent(void* data) {
if (data == nullptr) {
return;
}
String* payload = static_cast<String*>(data);
if (!payload) {
return;
}
if (!applyFrame(*payload)) {
LOG_WARN("PixelStream", String("Ignoring RAW payload with invalid length (") + String(payload->length()) + ")");
}
}
bool PixelStreamController::applyFrame(const String& payload) {
static constexpr std::size_t frameWidth = COMPONENTS_PER_PIXEL * 2;
const std::size_t payloadLength = static_cast<std::size_t>(payload.length());
if (payloadLength == 0 || (payloadLength % frameWidth) != 0) {
LOG_WARN("PixelStream", String("Payload size ") + String(payloadLength) + " is not a multiple of " + String(frameWidth));
return false;
}
const uint16_t framesProvided = static_cast<uint16_t>(payloadLength / frameWidth);
const uint16_t pixelsToUpdate = std::min(config.pixelCount, framesProvided);
for (uint16_t index = 0; index < pixelsToUpdate; ++index) {
const std::size_t base = static_cast<std::size_t>(index) * frameWidth;
FrameComponents components{};
if (!tryParsePixel(payload, base, components)) {
LOG_WARN("PixelStream", String("Invalid hex data at pixel index ") + String(index));
return false;
}
const uint16_t hardwareIndex = mapPixelIndex(index);
pixels.setPixelColor(hardwareIndex, pixels.Color(components.red, components.green, components.blue));
}
// Clear any remaining pixels so stale data is removed when fewer frames are provided
for (uint16_t index = pixelsToUpdate; index < config.pixelCount; ++index) {
const uint16_t hardwareIndex = mapPixelIndex(index);
pixels.setPixelColor(hardwareIndex, 0);
}
pixels.show();
return true;
}
uint16_t PixelStreamController::mapPixelIndex(uint16_t logicalIndex) const {
if (config.matrixWidth == 0) {
return logicalIndex;
}
const uint16_t row = logicalIndex / config.matrixWidth;
const uint16_t col = logicalIndex % config.matrixWidth;
if (!config.matrixSerpentine || (row % 2 == 0)) {
return row * config.matrixWidth + col;
}
const uint16_t reversedCol = (config.matrixWidth - 1) - col;
return row * config.matrixWidth + reversedCol;
}
int PixelStreamController::hexToNibble(char c) {
if (c >= '0' && c <= '9') {
return c - '0';
}
if (c >= 'a' && c <= 'f') {
return 10 + c - 'a';
}
if (c >= 'A' && c <= 'F') {
return 10 + c - 'A';
}
return -1;
}
bool PixelStreamController::tryParsePixel(const String& payload, std::size_t startIndex, FrameComponents& components) const {
static constexpr std::size_t frameWidth = COMPONENTS_PER_PIXEL * 2;
if (startIndex + frameWidth > static_cast<std::size_t>(payload.length())) {
return false;
}
const int rHi = hexToNibble(payload[startIndex]);
const int rLo = hexToNibble(payload[startIndex + 1]);
const int gHi = hexToNibble(payload[startIndex + 2]);
const int gLo = hexToNibble(payload[startIndex + 3]);
const int bHi = hexToNibble(payload[startIndex + 4]);
const int bLo = hexToNibble(payload[startIndex + 5]);
if (rHi < 0 || rLo < 0 || gHi < 0 || gLo < 0 || bHi < 0 || bLo < 0) {
return false;
}
components.red = static_cast<uint8_t>((rHi << 4) | rLo);
components.green = static_cast<uint8_t>((gHi << 4) | gLo);
components.blue = static_cast<uint8_t>((bHi << 4) | bLo);
return true;
}

View File

@@ -0,0 +1,42 @@
#pragma once
#include <Arduino.h>
#include <Adafruit_NeoPixel.h>
#include <algorithm>
#include <cstddef>
#include "spore/core/NodeContext.h"
#include "spore/util/Logging.h"
struct PixelStreamConfig {
uint8_t pin;
uint16_t pixelCount;
uint8_t brightness;
uint16_t matrixWidth;
bool matrixSerpentine;
neoPixelType pixelType;
};
class PixelStreamController {
public:
PixelStreamController(NodeContext& ctx, const PixelStreamConfig& config);
void begin();
private:
struct FrameComponents {
uint8_t red;
uint8_t green;
uint8_t blue;
};
bool tryParsePixel(const String& payload, std::size_t startIndex, FrameComponents& components) const;
void handleEvent(void* data);
bool applyFrame(const String& payload);
uint16_t mapPixelIndex(uint16_t logicalIndex) const;
static int hexToNibble(char c);
NodeContext& ctx;
PixelStreamConfig config;
Adafruit_NeoPixel pixels;
};

View File

@@ -0,0 +1,33 @@
# PixelStream Example
This example demonstrates how to consume the `udp/raw` cluster event and drive a NeoPixel strip or matrix directly from streamed RGB data. Frames are provided as hex encoded byte triplets (`RRGGBB` per pixel).
## Features
- Subscribes to `udp/raw` via `NodeContext::on`.
- Converts incoming frames into pixel colors for strips or matrices.
- Supports serpentine (zig-zag) matrix wiring.
## Payload Format
Each packet is expected to be `RAW:` followed by `pixelCount * 3 * 2` hexadecimal characters. For example, for 8 pixels:
```
RAW:FF0000FF0000FF0000FF0000FF0000FF0000FF0000FF0000FF0000
```
## Usage
### Strip Mode
Upload the example with `PIXEL_MATRIX_WIDTH` set to 0 (default). Send frames containing `PIXEL_COUNT * 3` bytes as hex.
### Matrix Mode
Set `PIXEL_MATRIX_WIDTH` to the number of columns. The controller remaps even/odd rows to support serpentine wiring.
## Configuration
Adjust `PIXEL_PIN`, `PIXEL_COUNT`, `PIXEL_BRIGHTNESS`, `PIXEL_MATRIX_WIDTH`, `PIXEL_MATRIX_SERPENTINE`, and `PIXEL_TYPE` through build defines or editing `main.cpp`.

View File

@@ -0,0 +1,60 @@
#include <Arduino.h>
#include "spore/Spore.h"
#include "spore/util/Logging.h"
#include "PixelStreamController.h"
#ifndef PIXEL_PIN
#define PIXEL_PIN 2
#endif
#ifndef PIXEL_COUNT
#define PIXEL_COUNT 64
#endif
#ifndef PIXEL_BRIGHTNESS
#define PIXEL_BRIGHTNESS 80
#endif
#ifndef PIXEL_MATRIX_WIDTH
#define PIXEL_MATRIX_WIDTH 0
#endif
#ifndef PIXEL_MATRIX_SERPENTINE
#define PIXEL_MATRIX_SERPENTINE 1
#endif
#ifndef PIXEL_TYPE
#define PIXEL_TYPE NEO_GRB + NEO_KHZ800
#endif
Spore spore({
{"app", "pixelstream"},
{"role", "led"},
{"pixels", String(PIXEL_COUNT)}
});
PixelStreamController* controller = nullptr;
void setup() {
spore.setup();
PixelStreamConfig config{
static_cast<uint8_t>(PIXEL_PIN),
static_cast<uint16_t>(PIXEL_COUNT),
static_cast<uint8_t>(PIXEL_BRIGHTNESS),
static_cast<uint16_t>(PIXEL_MATRIX_WIDTH),
static_cast<bool>(PIXEL_MATRIX_SERPENTINE),
static_cast<neoPixelType>(PIXEL_TYPE)
};
controller = new PixelStreamController(spore.getContext(), config);
controller->begin();
spore.begin();
}
void loop() {
spore.loop();
}

View File

@@ -2,6 +2,7 @@
#include <Arduino.h>
#include <ArduinoJson.h>
#include <ESPAsyncWebServer.h>
#include <AsyncWebSocket.h>
#include <Updater.h>
#include <functional>
#include <vector>
@@ -11,7 +12,6 @@
#include "spore/types/NodeInfo.h"
#include "spore/core/TaskManager.h"
#include "spore/types/ApiTypes.h"
#include "spore/util/Logging.h"
class Service; // Forward declaration
@@ -20,17 +20,15 @@ public:
ApiServer(NodeContext& ctx, TaskManager& taskMgr, uint16_t port = 80);
void begin();
void addService(Service& service);
void addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler);
void addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler,
const String& serviceName = "unknown");
void addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler,
std::function<void(AsyncWebServerRequest*, const String&, size_t, uint8_t*, size_t, bool)> uploadHandler,
const String& serviceName = "unknown");
std::function<void(AsyncWebServerRequest*, const String&, size_t, uint8_t*, size_t, bool)> uploadHandler);
void addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler,
const std::vector<ParamSpec>& params, const String& serviceName = "unknown");
const std::vector<ParamSpec>& params);
void addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler,
std::function<void(AsyncWebServerRequest*, const String&, size_t, uint8_t*, size_t, bool)> uploadHandler,
const std::vector<ParamSpec>& params, const String& serviceName = "unknown");
const std::vector<ParamSpec>& params);
// Static file serving
void serveStatic(const String& uri, fs::FS& fs, const String& path, const String& cache_header = "");
@@ -42,13 +40,18 @@ public:
private:
AsyncWebServer server;
AsyncWebSocket ws{ "/ws" };
NodeContext& ctx;
TaskManager& taskManager;
std::vector<std::reference_wrapper<Service>> services;
std::vector<EndpointInfo> endpoints; // Single source of truth for endpoints
std::vector<AsyncWebSocketClient*> wsClients;
// Internal helpers
void registerEndpoint(const String& uri, int method,
const std::vector<ParamSpec>& params,
const String& serviceName);
// WebSocket helpers
void setupWebSocket();
};

View File

@@ -7,13 +7,15 @@
#include <ArduinoJson.h>
#include <ESP8266HTTPClient.h>
#include <map>
#include <vector>
#include <functional>
class ClusterManager {
public:
ClusterManager(NodeContext& ctx, TaskManager& taskMgr);
void registerTasks();
void sendDiscovery();
void listenForDiscovery();
void listen();
void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP);
void updateAllNodeStatuses();
void removeDeadNodes();
@@ -26,4 +28,25 @@ public:
private:
NodeContext& ctx;
TaskManager& taskManager;
struct MessageHandler {
bool (*predicate)(const char*);
std::function<void(const char*)> handle;
const char* name;
};
void initMessageHandlers();
void handleIncomingMessage(const char* incoming);
static bool isDiscoveryMsg(const char* msg);
static bool isHeartbeatMsg(const char* msg);
static bool isResponseMsg(const char* msg);
static bool isNodeInfoMsg(const char* msg);
static bool isClusterEventMsg(const char* msg);
static bool isRawMsg(const char* msg);
void onDiscovery(const char* msg);
void onHeartbeat(const char* msg);
void onResponse(const char* msg);
void onNodeInfo(const char* msg);
void onClusterEvent(const char* msg);
void onRawMessage(const char* msg);
unsigned long lastHeartbeatSentAt = 0;
std::vector<MessageHandler> messageHandlers;
};

View File

@@ -23,7 +23,10 @@ public:
using EventCallback = std::function<void(void*)>;
std::map<std::string, std::vector<EventCallback>> eventRegistry;
using AnyEventCallback = std::function<void(const std::string&, void*)>;
std::vector<AnyEventCallback> anyEventSubscribers;
void on(const std::string& event, EventCallback cb);
void fire(const std::string& event, void* data);
void onAny(AnyEventCallback cb);
};

View File

@@ -7,8 +7,13 @@
namespace ClusterProtocol {
constexpr const char* DISCOVERY_MSG = "CLUSTER_DISCOVERY";
constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE";
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT";
constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO";
constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT";
constexpr const char* RAW_MSG = "RAW";
constexpr uint16_t UDP_PORT = 4210;
constexpr size_t UDP_BUF_SIZE = 64;
// Increased buffer to accommodate node info JSON over UDP
constexpr size_t UDP_BUF_SIZE = 512;
constexpr const char* API_NODE_STATUS = "/api/node/status";
}

View File

@@ -1,114 +0,0 @@
#pragma once
#include "spore/util/JsonSerializable.h"
#include "spore/util/Logging.h"
#include <ArduinoJson.h>
namespace spore {
namespace types {
/**
* Base class for API responses that can be serialized to JSON
* Handles complete JsonDocument creation and serialization
*/
class ApiResponse {
protected:
mutable JsonDocument doc;
public:
virtual ~ApiResponse() = default;
/**
* Get the complete JSON string representation of this response
* @return JSON string ready to send as HTTP response
*/
virtual String toJsonString() const {
String json;
serializeJson(doc, json);
return json;
}
/**
* Get the JsonDocument for direct manipulation if needed
* @return Reference to the internal JsonDocument
*/
JsonDocument& getDocument() { return doc; }
const JsonDocument& getDocument() const { return doc; }
/**
* Clear the document and reset for reuse
*/
virtual void clear() {
doc.clear();
}
};
/**
* Base class for API responses that contain a collection of serializable items
*/
template<typename ItemType>
class CollectionResponse : public ApiResponse {
protected:
String collectionKey;
public:
explicit CollectionResponse(const String& key) : collectionKey(key) {}
/**
* Add a serializable item to the collection
* @param item The serializable item to add
*/
void addItem(const util::JsonSerializable& item) {
// Ensure the array exists and get a reference to it
if (!doc[collectionKey].is<JsonArray>()) {
doc[collectionKey] = JsonArray();
}
JsonArray arr = doc[collectionKey].as<JsonArray>();
JsonObject obj = arr.add<JsonObject>();
item.toJson(obj);
}
/**
* Add a serializable item to the collection (move version)
* @param item The serializable item to add
*/
void addItem(util::JsonSerializable&& item) {
// Ensure the array exists and get a reference to it
if (!doc[collectionKey].is<JsonArray>()) {
doc[collectionKey] = JsonArray();
}
JsonArray arr = doc[collectionKey].as<JsonArray>();
JsonObject obj = arr.add<JsonObject>();
item.toJson(obj);
}
/**
* Add multiple items from a container
* @param items Container of serializable items
*/
template<typename Container>
void addItems(const Container& items) {
// Ensure the array exists and get a reference to it
if (!doc[collectionKey].is<JsonArray>()) {
doc[collectionKey] = JsonArray();
}
JsonArray arr = doc[collectionKey].as<JsonArray>();
for (const auto& item : items) {
JsonObject obj = arr.add<JsonObject>();
item.toJson(obj);
}
}
/**
* Get the current number of items in the collection
* @return Number of items
*/
size_t getItemCount() const {
if (doc[collectionKey].is<JsonArray>()) {
return doc[collectionKey].as<JsonArray>().size();
}
return 0;
}
};
} // namespace types
} // namespace spore

View File

@@ -1,47 +0,0 @@
#pragma once
#include "ApiResponse.h"
#include "NodeInfoSerializable.h"
#include <map>
namespace spore {
namespace types {
/**
* Response class for cluster members endpoint
* Handles complete JSON document creation for cluster member data
*/
class ClusterMembersResponse : public CollectionResponse<NodeInfoSerializable> {
public:
ClusterMembersResponse() : CollectionResponse("members") {}
/**
* Add a single node to the response
* @param node The NodeInfo to add
*/
void addNode(const NodeInfo& node) {
addItem(NodeInfoSerializable(const_cast<NodeInfo&>(node)));
}
/**
* Add multiple nodes from a member list
* @param memberList Map of hostname to NodeInfo
*/
void addNodes(const std::map<String, NodeInfo>& memberList) {
for (const auto& pair : memberList) {
addNode(pair.second);
}
}
/**
* Add nodes from a pointer to member list
* @param memberList Pointer to map of hostname to NodeInfo
*/
void addNodes(const std::map<String, NodeInfo>* memberList) {
if (memberList) {
addNodes(*memberList);
}
}
};
} // namespace types
} // namespace spore

View File

@@ -15,6 +15,7 @@ public:
// Cluster Configuration
unsigned long discovery_interval_ms;
unsigned long heartbeat_interval_ms;
unsigned long cluster_listen_interval_ms;
unsigned long status_update_interval_ms;
unsigned long member_info_update_interval_ms;
unsigned long print_interval_ms;

View File

@@ -1,76 +0,0 @@
#pragma once
#include "ApiTypes.h"
#include "spore/util/JsonSerializable.h"
#include <ArduinoJson.h>
namespace spore {
namespace types {
/**
* Serializable wrapper for EndpointInfo that implements JsonSerializable interface
* Handles conversion between EndpointInfo struct and JSON representation
*/
class EndpointInfoSerializable : public util::JsonSerializable {
private:
const EndpointInfo& endpoint;
public:
explicit EndpointInfoSerializable(const EndpointInfo& ep) : endpoint(ep) {}
/**
* Serialize EndpointInfo to JsonObject
*/
void toJson(JsonObject& obj) const override {
obj["uri"] = endpoint.uri;
obj["method"] = endpoint.method;
// Add parameters if present
if (!endpoint.params.empty()) {
JsonArray paramsArr = obj["params"].to<JsonArray>();
for (const auto& param : endpoint.params) {
JsonObject paramObj = paramsArr.add<JsonObject>();
paramObj["name"] = param.name;
paramObj["location"] = param.location;
paramObj["required"] = param.required;
paramObj["type"] = param.type;
if (!param.values.empty()) {
JsonArray valuesArr = paramObj["values"].to<JsonArray>();
for (const auto& value : param.values) {
valuesArr.add(value);
}
}
if (param.defaultValue.length() > 0) {
paramObj["default"] = param.defaultValue;
}
}
}
}
/**
* Deserialize EndpointInfo from JsonObject
*/
void fromJson(const JsonObject& obj) override {
// Note: This would require modifying the EndpointInfo struct to be mutable
// For now, this is a placeholder as EndpointInfo is typically read-only
// in the context where this serialization is used
}
};
/**
* Convenience function to create a JsonArray from a collection of EndpointInfo objects
*/
template<typename Container>
JsonArray endpointInfoToJsonArray(JsonDocument& doc, const Container& endpoints) {
JsonArray arr = doc.to<JsonArray>();
for (const auto& endpoint : endpoints) {
EndpointInfoSerializable serializable(endpoint);
JsonObject obj = arr.add<JsonObject>();
serializable.toJson(obj);
}
return arr;
}
} // namespace types
} // namespace spore

View File

@@ -17,7 +17,7 @@ struct NodeInfo {
uint32_t cpuFreqMHz = 0;
uint32_t flashChipSize = 0;
} resources;
unsigned long latency = 0; // ms since lastSeen
unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt
std::vector<EndpointInfo> endpoints; // List of registered endpoints
std::map<String, String> labels; // Arbitrary node labels (key -> value)
};

View File

@@ -1,141 +0,0 @@
#pragma once
#include "NodeInfo.h"
#include "ApiTypes.h"
#include "spore/util/JsonSerializable.h"
#include <ArduinoJson.h>
namespace spore {
namespace types {
/**
* Serializable wrapper for NodeInfo that implements JsonSerializable interface
* Handles conversion between NodeInfo struct and JSON representation
*/
class NodeInfoSerializable : public util::JsonSerializable {
private:
NodeInfo& nodeInfo;
public:
explicit NodeInfoSerializable(NodeInfo& node) : nodeInfo(node) {}
/**
* Serialize NodeInfo to JsonObject
* Maps all NodeInfo fields to appropriate JSON structure
*/
void toJson(JsonObject& obj) const override {
obj["hostname"] = nodeInfo.hostname;
obj["ip"] = nodeInfo.ip.toString();
obj["lastSeen"] = nodeInfo.lastSeen;
obj["latency"] = nodeInfo.latency;
obj["status"] = statusToStr(nodeInfo.status);
// Serialize resources
JsonObject resources = obj["resources"].to<JsonObject>();
resources["freeHeap"] = nodeInfo.resources.freeHeap;
resources["chipId"] = nodeInfo.resources.chipId;
resources["sdkVersion"] = nodeInfo.resources.sdkVersion;
resources["cpuFreqMHz"] = nodeInfo.resources.cpuFreqMHz;
resources["flashChipSize"] = nodeInfo.resources.flashChipSize;
// Serialize labels if present
if (!nodeInfo.labels.empty()) {
JsonObject labels = obj["labels"].to<JsonObject>();
for (const auto& kv : nodeInfo.labels) {
labels[kv.first.c_str()] = kv.second;
}
}
// Serialize endpoints if present
if (!nodeInfo.endpoints.empty()) {
JsonArray endpoints = obj["api"].to<JsonArray>();
for (const auto& endpoint : nodeInfo.endpoints) {
JsonObject endpointObj = endpoints.add<JsonObject>();
endpointObj["uri"] = endpoint.uri;
endpointObj["method"] = endpoint.method;
}
}
}
/**
* Deserialize NodeInfo from JsonObject
* Populates NodeInfo fields from JSON structure
*/
void fromJson(const JsonObject& obj) override {
nodeInfo.hostname = obj["hostname"].as<String>();
// Parse IP address
const char* ipStr = obj["ip"];
if (ipStr) {
nodeInfo.ip.fromString(ipStr);
}
nodeInfo.lastSeen = obj["lastSeen"].as<unsigned long>();
nodeInfo.latency = obj["latency"].as<unsigned long>();
// Parse status
const char* statusStr = obj["status"];
if (statusStr) {
if (strcmp(statusStr, "ACTIVE") == 0) {
nodeInfo.status = NodeInfo::ACTIVE;
} else if (strcmp(statusStr, "INACTIVE") == 0) {
nodeInfo.status = NodeInfo::INACTIVE;
} else if (strcmp(statusStr, "DEAD") == 0) {
nodeInfo.status = NodeInfo::DEAD;
}
}
// Parse resources
if (obj["resources"].is<JsonObject>()) {
JsonObject resources = obj["resources"].as<JsonObject>();
nodeInfo.resources.freeHeap = resources["freeHeap"].as<uint32_t>();
nodeInfo.resources.chipId = resources["chipId"].as<uint32_t>();
nodeInfo.resources.sdkVersion = resources["sdkVersion"].as<String>();
nodeInfo.resources.cpuFreqMHz = resources["cpuFreqMHz"].as<uint32_t>();
nodeInfo.resources.flashChipSize = resources["flashChipSize"].as<uint32_t>();
}
// Parse labels
nodeInfo.labels.clear();
if (obj["labels"].is<JsonObject>()) {
JsonObject labels = obj["labels"].as<JsonObject>();
for (JsonPair kvp : labels) {
nodeInfo.labels[kvp.key().c_str()] = kvp.value().as<String>();
}
}
// Parse endpoints
nodeInfo.endpoints.clear();
if (obj["api"].is<JsonArray>()) {
JsonArray endpoints = obj["api"].as<JsonArray>();
for (JsonObject endpointObj : endpoints) {
EndpointInfo endpoint;
endpoint.uri = endpointObj["uri"].as<String>();
endpoint.method = endpointObj["method"].as<int>();
endpoint.isLocal = false;
endpoint.serviceName = "remote";
nodeInfo.endpoints.push_back(std::move(endpoint));
}
}
}
};
/**
* Convenience function to create a JsonArray from a collection of NodeInfo objects
* @param doc The JsonDocument to create the array in
* @param nodes Collection of NodeInfo objects
* @return A JsonArray containing all serialized NodeInfo objects
*/
template<typename Container>
JsonArray nodeInfoToJsonArray(JsonDocument& doc, const Container& nodes) {
JsonArray arr = doc.to<JsonArray>();
for (const auto& pair : nodes) {
const NodeInfo& node = pair.second;
NodeInfoSerializable serializable(const_cast<NodeInfo&>(node));
JsonObject obj = arr.add<JsonObject>();
serializable.toJson(obj);
}
return arr;
}
} // namespace types
} // namespace spore

View File

@@ -1,102 +0,0 @@
#pragma once
#include "ApiResponse.h"
#include "EndpointInfoSerializable.h"
#include "NodeInfo.h"
#include "spore/util/Logging.h"
#include <vector>
namespace spore {
namespace types {
/**
* Response class for node status endpoint
* Handles complete JSON document creation for node status data
*/
class NodeStatusResponse : public ApiResponse {
public:
/**
* Set basic system information
*/
void setSystemInfo() {
doc["freeHeap"] = ESP.getFreeHeap();
doc["chipId"] = ESP.getChipId();
doc["sdkVersion"] = ESP.getSdkVersion();
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
doc["flashChipSize"] = ESP.getFlashChipSize();
}
/**
* Add labels to the response
* @param labels Map of label key-value pairs
*/
void addLabels(const std::map<String, String>& labels) {
if (!labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
}
/**
* Build complete response with system info and labels
* @param labels Optional labels to include
*/
void buildCompleteResponse(const std::map<String, String>& labels = {}) {
setSystemInfo();
addLabels(labels);
}
};
/**
* Response class for node endpoints endpoint
* Handles complete JSON document creation for endpoint data
*/
class NodeEndpointsResponse : public CollectionResponse<EndpointInfoSerializable> {
public:
NodeEndpointsResponse() : CollectionResponse("endpoints") {}
/**
* Add a single endpoint to the response
* @param endpoint The EndpointInfo to add
*/
void addEndpoint(const EndpointInfo& endpoint) {
addItem(EndpointInfoSerializable(endpoint));
}
/**
* Add multiple endpoints from a container
* @param endpoints Container of EndpointInfo objects
*/
void addEndpoints(const std::vector<EndpointInfo>& endpoints) {
for (const auto& endpoint : endpoints) {
addEndpoint(endpoint);
}
}
};
/**
* Response class for simple status operations (update, restart)
* Handles simple JSON responses for node operations
*/
class NodeOperationResponse : public ApiResponse {
public:
/**
* Set success response
* @param status Status message
*/
void setSuccess(const String& status) {
doc["status"] = status;
}
/**
* Set error response
* @param status Error status message
*/
void setError(const String& status) {
doc["status"] = status;
}
};
} // namespace types
} // namespace spore

View File

@@ -1,99 +0,0 @@
#pragma once
#include "spore/util/JsonSerializable.h"
#include <ArduinoJson.h>
#include <Arduino.h>
namespace spore {
namespace types {
/**
* Serializable wrapper for task information that implements JsonSerializable interface
* Handles conversion between task data and JSON representation
*/
class TaskInfoSerializable : public util::JsonSerializable {
private:
String taskName;
const JsonObject& taskData;
public:
TaskInfoSerializable(const String& name, const JsonObject& data)
: taskName(name), taskData(data) {}
TaskInfoSerializable(const std::string& name, const JsonObject& data)
: taskName(name.c_str()), taskData(data) {}
/**
* Serialize task info to JsonObject
*/
void toJson(JsonObject& obj) const override {
obj["name"] = taskName;
obj["interval"] = taskData["interval"];
obj["enabled"] = taskData["enabled"];
obj["running"] = taskData["running"];
obj["autoStart"] = taskData["autoStart"];
}
/**
* Deserialize task info from JsonObject
* Note: This is read-only for task status, so fromJson is not implemented
*/
void fromJson(const JsonObject& obj) override {
// Task info is typically read-only in this context
// Implementation would go here if needed
}
};
/**
* Serializable wrapper for system information
*/
class SystemInfoSerializable : public util::JsonSerializable {
public:
/**
* Serialize system info to JsonObject
*/
void toJson(JsonObject& obj) const override {
obj["freeHeap"] = ESP.getFreeHeap();
obj["uptime"] = millis();
}
/**
* Deserialize system info from JsonObject
* Note: System info is typically read-only, so fromJson is not implemented
*/
void fromJson(const JsonObject& obj) override {
// System info is typically read-only
// Implementation would go here if needed
}
};
/**
* Serializable wrapper for task summary information
*/
class TaskSummarySerializable : public util::JsonSerializable {
private:
size_t totalTasks;
size_t activeTasks;
public:
TaskSummarySerializable(size_t total, size_t active)
: totalTasks(total), activeTasks(active) {}
/**
* Serialize task summary to JsonObject
*/
void toJson(JsonObject& obj) const override {
obj["totalTasks"] = totalTasks;
obj["activeTasks"] = activeTasks;
}
/**
* Deserialize task summary from JsonObject
*/
void fromJson(const JsonObject& obj) override {
totalTasks = obj["totalTasks"].as<size_t>();
activeTasks = obj["activeTasks"].as<size_t>();
}
};
} // namespace types
} // namespace spore

View File

@@ -1,194 +0,0 @@
#pragma once
#include "ApiResponse.h"
#include "TaskInfoSerializable.h"
#include <map>
namespace spore {
namespace types {
/**
* Response class for task status endpoint
* Handles complete JSON document creation for task status data
*/
class TaskStatusResponse : public ApiResponse {
public:
/**
* Set the task summary information
* @param totalTasks Total number of tasks
* @param activeTasks Number of active tasks
*/
void setSummary(size_t totalTasks, size_t activeTasks) {
TaskSummarySerializable summary(totalTasks, activeTasks);
JsonObject summaryObj = doc["summary"].to<JsonObject>();
summary.toJson(summaryObj);
}
/**
* Add a single task to the response
* @param taskName Name of the task
* @param taskData Task data as JsonObject
*/
void addTask(const String& taskName, const JsonObject& taskData) {
TaskInfoSerializable serializable(taskName, taskData);
if (!doc["tasks"].is<JsonArray>()) {
doc["tasks"] = JsonArray();
}
JsonArray tasksArr = doc["tasks"].as<JsonArray>();
JsonObject taskObj = tasksArr.add<JsonObject>();
serializable.toJson(taskObj);
}
/**
* Add a single task to the response (std::string version)
* @param taskName Name of the task
* @param taskData Task data as JsonObject
*/
void addTask(const std::string& taskName, const JsonObject& taskData) {
TaskInfoSerializable serializable(taskName, taskData);
if (!doc["tasks"].is<JsonArray>()) {
doc["tasks"] = JsonArray();
}
JsonArray tasksArr = doc["tasks"].as<JsonArray>();
JsonObject taskObj = tasksArr.add<JsonObject>();
serializable.toJson(taskObj);
}
/**
* Add multiple tasks from a task statuses map
* @param taskStatuses Map of task name to task data
*/
void addTasks(const std::map<String, JsonObject>& taskStatuses) {
for (const auto& pair : taskStatuses) {
addTask(pair.first, pair.second);
}
}
/**
* Set the system information
*/
void setSystemInfo() {
SystemInfoSerializable systemInfo;
JsonObject systemObj = doc["system"].to<JsonObject>();
systemInfo.toJson(systemObj);
}
/**
* Build complete response with all components
* @param taskStatuses Map of task name to task data
*/
void buildCompleteResponse(const std::map<String, JsonObject>& taskStatuses) {
// Set summary
size_t totalTasks = taskStatuses.size();
size_t activeTasks = 0;
for (const auto& pair : taskStatuses) {
if (pair.second["enabled"]) {
activeTasks++;
}
}
setSummary(totalTasks, activeTasks);
// Add all tasks
addTasks(taskStatuses);
// Set system info
setSystemInfo();
}
/**
* Build complete response with all components from vector
* @param taskStatuses Vector of pairs of task name to task data
*/
void buildCompleteResponse(const std::vector<std::pair<std::string, JsonObject>>& taskStatuses) {
// Clear the document first since getAllTaskStatuses creates a root array
doc.clear();
// Set summary
size_t totalTasks = taskStatuses.size();
size_t activeTasks = 0;
for (const auto& pair : taskStatuses) {
if (pair.second["enabled"]) {
activeTasks++;
}
}
setSummary(totalTasks, activeTasks);
// Add all tasks - extract data before clearing to avoid invalid references
JsonArray tasksArr = doc["tasks"].to<JsonArray>();
for (const auto& pair : taskStatuses) {
// Extract data from JsonObject before it becomes invalid
String taskName = pair.first.c_str();
unsigned long interval = pair.second["interval"];
bool enabled = pair.second["enabled"];
bool running = pair.second["running"];
bool autoStart = pair.second["autoStart"];
// Create new JsonObject in our document
JsonObject taskObj = tasksArr.add<JsonObject>();
taskObj["name"] = taskName;
taskObj["interval"] = interval;
taskObj["enabled"] = enabled;
taskObj["running"] = running;
taskObj["autoStart"] = autoStart;
}
// Set system info
setSystemInfo();
}
};
/**
* Response class for task control operations
* Handles JSON responses for task enable/disable/start/stop operations
*/
class TaskControlResponse : public ApiResponse {
public:
/**
* Set the response data for a task control operation
* @param success Whether the operation was successful
* @param message Response message
* @param taskName Name of the task
* @param action Action performed
*/
void setResponse(bool success, const String& message, const String& taskName, const String& action) {
doc["success"] = success;
doc["message"] = message;
doc["task"] = taskName;
doc["action"] = action;
}
/**
* Add detailed task information to the response
* @param taskName Name of the task
* @param enabled Whether task is enabled
* @param running Whether task is running
* @param interval Task interval
*/
void addTaskDetails(const String& taskName, bool enabled, bool running, unsigned long interval) {
JsonObject taskDetails = doc["taskDetails"].to<JsonObject>();
taskDetails["name"] = taskName;
taskDetails["enabled"] = enabled;
taskDetails["running"] = running;
taskDetails["interval"] = interval;
// Add system info
SystemInfoSerializable systemInfo;
JsonObject systemObj = taskDetails["system"].to<JsonObject>();
systemInfo.toJson(systemObj);
}
/**
* Set error response
* @param message Error message
* @param example Optional example for correct usage
*/
void setError(const String& message, const String& example = "") {
doc["success"] = false;
doc["message"] = message;
if (example.length() > 0) {
doc["example"] = example;
}
}
};
} // namespace types
} // namespace spore

View File

@@ -1,56 +0,0 @@
#pragma once
#include <ArduinoJson.h>
namespace spore {
namespace util {
/**
* Abstract base class for objects that can be serialized to/from JSON
* Provides a clean interface for converting objects to JsonObject and back
*/
class JsonSerializable {
public:
virtual ~JsonSerializable() = default;
/**
* Serialize this object to a JsonObject
* @param obj The JsonObject to populate with this object's data
*/
virtual void toJson(JsonObject& obj) const = 0;
/**
* Deserialize this object from a JsonObject
* @param obj The JsonObject containing the data to populate this object
*/
virtual void fromJson(const JsonObject& obj) = 0;
/**
* Convenience method to create a JsonObject from this object
* @param doc The JsonDocument to create the object in
* @return A JsonObject containing this object's serialized data
*/
JsonObject toJsonObject(JsonDocument& doc) const {
JsonObject obj = doc.to<JsonObject>();
toJson(obj);
return obj;
}
/**
* Convenience method to create a JsonArray from a collection of serializable objects
* @param doc The JsonDocument to create the array in
* @param objects Collection of objects implementing JsonSerializable
* @return A JsonArray containing all serialized objects
*/
template<typename Container>
static JsonArray toJsonArray(JsonDocument& doc, const Container& objects) {
JsonArray arr = doc.to<JsonArray>();
for (const auto& obj : objects) {
JsonObject item = arr.add<JsonObject>();
obj.toJson(item);
}
return arr;
}
};
} // namespace util
} // namespace spore

View File

@@ -100,3 +100,24 @@ build_src_filter =
+<src/spore/types/*.cpp>
+<src/spore/util/*.cpp>
+<src/internal/*.cpp>
[env:pixelstream]
platform = platformio/espressif8266@^4.2.1
board = esp01_1m
framework = arduino
upload_speed = 115200
monitor_speed = 115200
board_build.filesystem = littlefs
board_build.flash_mode = dout
board_build.ldscript = eagle.flash.1m64.ld
lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1
build_flags =
build_src_filter =
+<examples/pixelstream/*.cpp>
+<src/spore/*.cpp>
+<src/spore/core/*.cpp>
+<src/spore/services/*.cpp>
+<src/spore/types/*.cpp>
+<src/spore/util/*.cpp>
+<src/internal/*.cpp>

View File

@@ -21,31 +21,57 @@ void ApiServer::registerEndpoint(const String& uri, int method,
const String& serviceName) {
// Add to local endpoints
endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
// Update cluster if needed
if (ctx.memberList && !ctx.memberList->empty()) {
auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
it->second.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
}
}
}
void ApiServer::addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler,
const String& serviceName) {
void ApiServer::addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler) {
// Get current service name if available
String serviceName = "unknown";
if (!services.empty()) {
serviceName = services.back().get().getName();
}
registerEndpoint(uri, method, {}, serviceName);
server.on(uri.c_str(), method, requestHandler);
}
void ApiServer::addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler,
std::function<void(AsyncWebServerRequest*, const String&, size_t, uint8_t*, size_t, bool)> uploadHandler,
const String& serviceName) {
std::function<void(AsyncWebServerRequest*, const String&, size_t, uint8_t*, size_t, bool)> uploadHandler) {
// Get current service name if available
String serviceName = "unknown";
if (!services.empty()) {
serviceName = services.back().get().getName();
}
registerEndpoint(uri, method, {}, serviceName);
server.on(uri.c_str(), method, requestHandler, uploadHandler);
}
// Overloads that also record minimal capability specs
void ApiServer::addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler,
const std::vector<ParamSpec>& params, const String& serviceName) {
const std::vector<ParamSpec>& params) {
// Get current service name if available
String serviceName = "unknown";
if (!services.empty()) {
serviceName = services.back().get().getName();
}
registerEndpoint(uri, method, params, serviceName);
server.on(uri.c_str(), method, requestHandler);
}
void ApiServer::addEndpoint(const String& uri, int method, std::function<void(AsyncWebServerRequest*)> requestHandler,
std::function<void(AsyncWebServerRequest*, const String&, size_t, uint8_t*, size_t, bool)> uploadHandler,
const std::vector<ParamSpec>& params, const String& serviceName) {
const std::vector<ParamSpec>& params) {
// Get current service name if available
String serviceName = "unknown";
if (!services.empty()) {
serviceName = services.back().get().getName();
}
registerEndpoint(uri, method, params, serviceName);
server.on(uri.c_str(), method, requestHandler, uploadHandler);
}
@@ -61,6 +87,10 @@ void ApiServer::serveStatic(const String& uri, fs::FS& fs, const String& path, c
}
void ApiServer::begin() {
// Setup streaming API (WebSocket)
setupWebSocket();
server.addHandler(&ws);
// Register all service endpoints
for (auto& service : services) {
service.get().registerEndpoints(*this);
@@ -69,3 +99,90 @@ void ApiServer::begin() {
server.begin();
}
void ApiServer::setupWebSocket() {
ws.onEvent([this](AsyncWebSocket* server, AsyncWebSocketClient* client, AwsEventType type, void* arg, uint8_t* data, size_t len) {
if (type == WS_EVT_DATA) {
AwsFrameInfo* info = (AwsFrameInfo*)arg;
if (info->final && info->index == 0 && info->len == len && info->opcode == WS_TEXT) {
// Parse directly from the raw buffer with explicit length
JsonDocument doc;
DeserializationError err = deserializeJson(doc, (const char*)data, len);
if (!err) {
LOG_DEBUG("API", "Received event: " + String(doc["event"].as<String>()));
String eventName = doc["event"].as<String>();
String payloadStr;
if (doc["payload"].is<const char*>()) {
payloadStr = doc["payload"].as<const char*>();
} else if (!doc["payload"].isNull()) {
// If payload is an object/array, serialize it
String tmp; serializeJson(doc["payload"], tmp); payloadStr = tmp;
}
// Allow empty payload; services may treat it as defaults
if (eventName.length() > 0) {
// Inject origin tag into payload JSON if possible
String enriched = payloadStr;
if (payloadStr.length() > 0) {
JsonDocument pd;
if (!deserializeJson(pd, payloadStr)) {
pd["_origin"] = String("ws:") + String(client->id());
String tmp; serializeJson(pd, tmp); enriched = tmp;
} else {
// If payload is plain string, leave as-is (no origin)
}
}
std::string ev = eventName.c_str();
ctx.fire(ev, &enriched);
// Acknowledge
client->text("{\"ok\":true}");
} else {
client->text("{\"error\":\"Missing 'event'\"}");
}
} else {
client->text("{\"error\":\"Invalid JSON\"}");
}
}
} else if (type == WS_EVT_CONNECT) {
client->text("{\"hello\":\"ws connected\"}");
wsClients.push_back(client);
} else if (type == WS_EVT_DISCONNECT) {
wsClients.erase(std::remove(wsClients.begin(), wsClients.end(), client), wsClients.end());
}
});
// Subscribe to all local events and forward to websocket clients
ctx.onAny([this](const std::string& event, void* dataPtr) {
String* payloadStrPtr = static_cast<String*>(dataPtr);
String payloadStr = payloadStrPtr ? *payloadStrPtr : String("");
// Extract and strip origin if present
String origin;
String cleanedPayload = payloadStr;
if (payloadStr.length() > 0) {
JsonDocument pd;
if (!deserializeJson(pd, payloadStr)) {
if (pd["_origin"].is<const char*>()) {
origin = pd["_origin"].as<const char*>();
pd.remove("_origin");
String tmp; serializeJson(pd, tmp); cleanedPayload = tmp;
}
}
}
JsonDocument outDoc;
outDoc["event"] = event.c_str();
outDoc["payload"] = cleanedPayload;
String out; serializeJson(outDoc, out);
if (origin.startsWith("ws:")) {
uint32_t originId = (uint32_t)origin.substring(3).toInt();
for (auto* c : wsClients) {
if (c && c->id() != originId) {
c->text(out);
}
}
} else {
ws.textAll(out);
}
});
}

View File

@@ -8,17 +8,35 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
NodeInfo* node = static_cast<NodeInfo*>(data);
this->addOrUpdateNode(node->hostname, node->ip);
});
// Centralized broadcast handler: services fire 'cluster/broadcast' with CLUSTER_EVENT JSON payload
ctx.on("cluster/broadcast", [this](void* data) {
String* jsonStr = static_cast<String*>(data);
if (!jsonStr) {
LOG_WARN("Cluster", "cluster/broadcast called with null data");
return;
}
// Subnet-directed broadcast (more reliable than 255.255.255.255 on some networks)
IPAddress ip = WiFi.localIP();
IPAddress mask = WiFi.subnetMask();
IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]);
LOG_DEBUG("Cluster", String("Broadcasting CLUSTER_EVENT to ") + bcast.toString() + " len=" + String(jsonStr->length()));
this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port);
String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr;
this->ctx.udp->write(msg.c_str());
this->ctx.udp->endPacket();
});
// Register tasks
registerTasks();
initMessageHandlers();
}
void ClusterManager::registerTasks() {
taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
taskManager.registerTask("discovery_listen", ctx.config.discovery_interval_ms / 10, [this]() { listenForDiscovery(); });
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
taskManager.registerTask("update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
LOG_INFO("ClusterManager", "Registered all cluster tasks");
}
@@ -29,28 +47,224 @@ void ClusterManager::sendDiscovery() {
ctx.udp->endPacket();
}
void ClusterManager::listenForDiscovery() {
void ClusterManager::listen() {
int packetSize = ctx.udp->parsePacket();
if (packetSize) {
char incoming[ClusterProtocol::UDP_BUF_SIZE];
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
if (len > 0) {
incoming[len] = 0;
}
//LOG_DEBUG(ctx, "UDP", "Packet received: " + String(incoming));
if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) {
//LOG_DEBUG(ctx, "UDP", "Discovery request from: " + ctx.udp->remoteIP().toString());
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
ctx.udp->write(response.c_str());
ctx.udp->endPacket();
//LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname);
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
String nodeHost = String(hostPtr);
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
if (!packetSize) {
return;
}
char incoming[ClusterProtocol::UDP_BUF_SIZE];
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
if (len <= 0) {
return;
}
if (len >= (int)ClusterProtocol::UDP_BUF_SIZE) {
incoming[ClusterProtocol::UDP_BUF_SIZE - 1] = 0;
} else {
incoming[len] = 0;
}
handleIncomingMessage(incoming);
}
void ClusterManager::initMessageHandlers() {
messageHandlers.clear();
messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" });
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" });
}
void ClusterManager::handleIncomingMessage(const char* incoming) {
for (const auto& h : messageHandlers) {
if (h.predicate(incoming)) {
h.handle(incoming);
return;
}
}
// Unknown message - log first token
const char* colon = strchr(incoming, ':');
String head;
if (colon) {
head = String(incoming).substring(0, colon - incoming);
} else {
head = String(incoming);
}
LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head);
}
bool ClusterManager::isDiscoveryMsg(const char* msg) {
return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
}
bool ClusterManager::isHeartbeatMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
}
bool ClusterManager::isResponseMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0;
}
bool ClusterManager::isNodeInfoMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
}
bool ClusterManager::isClusterEventMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::CLUSTER_EVENT_MSG, strlen(ClusterProtocol::CLUSTER_EVENT_MSG)) == 0;
}
bool ClusterManager::isRawMsg(const char* msg) {
// RAW frames must be "RAW:<payload>"; enforce the delimiter so we skip things like "RAW_HEARTBEAT".
const std::size_t prefixLen = strlen(ClusterProtocol::RAW_MSG);
if (strncmp(msg, ClusterProtocol::RAW_MSG, prefixLen) != 0) {
return false;
}
return msg[prefixLen] == ':';
}
void ClusterManager::onDiscovery(const char* /*msg*/) {
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
ctx.udp->write(response.c_str());
ctx.udp->endPacket();
}
void ClusterManager::onHeartbeat(const char* /*msg*/) {
JsonDocument doc;
if (ctx.memberList) {
auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : it->second.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
} else if (!ctx.self.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : ctx.self.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
}
String json;
serializeJson(doc, json);
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
ctx.udp->write(msg.c_str());
ctx.udp->endPacket();
}
void ClusterManager::onResponse(const char* msg) {
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
String nodeHost = String(hostPtr);
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
}
void ClusterManager::onNodeInfo(const char* msg) {
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
char* hostEnd = strchr(p, ':');
if (hostEnd) {
*hostEnd = '\0';
const char* hostCStr = p;
const char* jsonCStr = hostEnd + 1;
String nodeHost = String(hostCStr);
IPAddress senderIP = ctx.udp->remoteIP();
addOrUpdateNode(nodeHost, senderIP);
JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonCStr);
if (!err) {
auto& memberList = *ctx.memberList;
auto it = memberList.find(nodeHost);
if (it != memberList.end()) {
NodeInfo& node = it->second;
node.status = NodeInfo::ACTIVE;
unsigned long now = millis();
node.lastSeen = now;
if (lastHeartbeatSentAt != 0) {
node.latency = now - lastHeartbeatSentAt;
}
node.labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
node.labels[key] = value;
}
}
}
} else {
LOG_WARN("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
}
}
}
void ClusterManager::onClusterEvent(const char* msg) {
// Message format: CLUSTER_EVENT:{"event":"...","data":"<json string>"}
const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':'
if (*jsonStart == '\0') {
LOG_DEBUG("Cluster", "CLUSTER_EVENT received with empty payload");
return;
}
LOG_DEBUG("Cluster", String("CLUSTER_EVENT raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart)));
JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonStart);
if (err) {
LOG_ERROR("Cluster", String("Failed to parse CLUSTER_EVENT JSON from ") + ctx.udp->remoteIP().toString());
return;
}
// Robust extraction of event and data
String eventStr;
if (doc["event"].is<const char*>()) {
eventStr = doc["event"].as<const char*>();
} else if (doc["event"].is<String>()) {
eventStr = doc["event"].as<String>();
}
String data;
if (doc["data"].is<const char*>()) {
data = doc["data"].as<const char*>();
} else if (doc["data"].is<JsonVariantConst>()) {
// If data is a nested JSON object/array, serialize it back to string
String tmp;
serializeJson(doc["data"], tmp);
data = tmp;
}
if (eventStr.length() == 0 || data.length() == 0) {
String dbg;
serializeJson(doc, dbg);
LOG_WARN("Cluster", String("CLUSTER_EVENT missing 'event' or 'data' | payload=") + dbg);
return;
}
std::string eventKey(eventStr.c_str());
LOG_DEBUG("Cluster", String("Firing event '") + eventStr + "' with dataLen=" + String(data.length()));
ctx.fire(eventKey, &data);
}
void ClusterManager::onRawMessage(const char* msg) {
const std::size_t prefixLen = strlen(ClusterProtocol::RAW_MSG);
if (msg[prefixLen] != ':') {
LOG_WARN("Cluster", "RAW message received without payload delimiter");
return;
}
const char* payloadStart = msg + prefixLen + 1;
if (*payloadStart == '\0') {
LOG_WARN("Cluster", "RAW message received with empty payload");
return;
}
String payload(payloadStart);
ctx.fire("udp/raw", &payload);
}
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
@@ -71,12 +285,13 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
newNode.hostname = nodeHost;
newNode.ip = nodeIP;
newNode.lastSeen = millis();
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
memberList[nodeHost] = newNode;
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
}
// unused http client to fetch complete node info
void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
if(ip == ctx.localIP) {
LOG_DEBUG("Cluster", "Skipping fetch for local node");
@@ -192,33 +407,20 @@ void ClusterManager::heartbeatTaskCallback() {
node.lastSeen = millis();
node.status = NodeInfo::ACTIVE;
updateLocalNodeResources();
ctx.fire("node_discovered", &node);
addOrUpdateNode(ctx.hostname, ctx.localIP);
}
// Broadcast heartbeat so peers can respond with their node info
lastHeartbeatSentAt = millis();
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
ctx.udp->write(hb.c_str());
ctx.udp->endPacket();
}
void ClusterManager::updateAllMembersInfoTaskCallback() {
auto& memberList = *ctx.memberList;
// Limit concurrent HTTP requests to prevent memory pressure
const size_t maxConcurrentRequests = ctx.config.max_concurrent_http_requests;
size_t requestCount = 0;
for (auto& pair : memberList) {
const NodeInfo& node = pair.second;
if (node.ip != ctx.localIP) {
// Only process a limited number of requests per cycle
if (requestCount >= maxConcurrentRequests) {
LOG_DEBUG("Cluster", "Limiting concurrent HTTP requests to prevent memory pressure");
break;
}
fetchNodeInfo(node.ip);
requestCount++;
// Add small delay between requests to prevent overwhelming the system
delay(100);
}
}
// HTTP-based member info fetching disabled; node info is provided via UDP responses to heartbeats
// No-op to reduce network and memory usage
}
void ClusterManager::updateAllNodeStatuses() {

View File

@@ -29,4 +29,11 @@ void NodeContext::fire(const std::string& event, void* data) {
for (auto& cb : eventRegistry[event]) {
cb(data);
}
for (auto& acb : anyEventSubscribers) {
acb(event, data);
}
}
void NodeContext::onAny(AnyEventCallback cb) {
anyEventSubscribers.push_back(cb);
}

View File

@@ -1,19 +1,60 @@
#include "spore/services/ClusterService.h"
#include "spore/core/ApiServer.h"
#include "spore/types/ClusterResponse.h"
using spore::types::ClusterMembersResponse;
ClusterService::ClusterService(NodeContext& ctx) : ctx(ctx) {}
void ClusterService::registerEndpoints(ApiServer& api) {
api.addEndpoint("/api/cluster/members", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleMembersRequest(request); },
std::vector<ParamSpec>{}, "ClusterService");
std::vector<ParamSpec>{});
// Generic cluster broadcast endpoint
api.addEndpoint("/api/cluster/event", HTTP_POST,
[this](AsyncWebServerRequest* request) {
if (!request->hasParam("event", true) || !request->hasParam("payload", true)) {
request->send(400, "application/json", "{\"error\":\"Missing 'event' or 'payload'\"}");
return;
}
String eventName = request->getParam("event", true)->value();
String payloadStr = request->getParam("payload", true)->value();
JsonDocument envelope;
envelope["event"] = eventName;
envelope["data"] = payloadStr; // pass payload as JSON string
String eventJson;
serializeJson(envelope, eventJson);
std::string ev = "cluster/broadcast";
ctx.fire(ev, &eventJson);
request->send(200, "application/json", "{\"ok\":true}");
},
std::vector<ParamSpec>{
ParamSpec{String("event"), true, String("body"), String("string"), {}},
ParamSpec{String("payload"), true, String("body"), String("string"), {}}
});
}
void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
ClusterMembersResponse response;
response.addNodes(ctx.memberList);
request->send(200, "application/json", response.toJsonString());
JsonDocument doc;
JsonArray arr = doc["members"].to<JsonArray>();
for (const auto& pair : *ctx.memberList) {
const NodeInfo& node = pair.second;
JsonObject obj = arr.add<JsonObject>();
obj["hostname"] = node.hostname;
obj["ip"] = node.ip.toString();
obj["lastSeen"] = node.lastSeen;
obj["latency"] = node.latency;
obj["status"] = statusToStr(node.status);
// Add labels if present
if (!node.labels.empty()) {
JsonObject labelsObj = obj["labels"].to<JsonObject>();
for (const auto& kv : node.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
}
String json;
serializeJson(doc, json);
request->send(200, "application/json", json);
}

View File

@@ -12,7 +12,7 @@ MonitoringService::MonitoringService(CpuUsage& cpuUsage)
void MonitoringService::registerEndpoints(ApiServer& api) {
api.addEndpoint("/api/monitoring/resources", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleResourcesRequest(request); },
std::vector<ParamSpec>{}, "MonitoringService");
std::vector<ParamSpec>{});
}
MonitoringService::SystemResources MonitoringService::getSystemResources() const {

View File

@@ -8,16 +8,16 @@ void NetworkService::registerEndpoints(ApiServer& api) {
// WiFi scanning endpoints
api.addEndpoint("/api/network/wifi/scan", HTTP_POST,
[this](AsyncWebServerRequest* request) { handleWifiScanRequest(request); },
std::vector<ParamSpec>{}, "NetworkService");
std::vector<ParamSpec>{});
api.addEndpoint("/api/network/wifi/scan", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleGetWifiNetworks(request); },
std::vector<ParamSpec>{}, "NetworkService");
std::vector<ParamSpec>{});
// Network status and configuration endpoints
api.addEndpoint("/api/network/status", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleNetworkStatus(request); },
std::vector<ParamSpec>{}, "NetworkService");
std::vector<ParamSpec>{});
api.addEndpoint("/api/network/wifi/config", HTTP_POST,
[this](AsyncWebServerRequest* request) { handleSetWifiConfig(request); },
@@ -26,7 +26,7 @@ void NetworkService::registerEndpoints(ApiServer& api) {
ParamSpec{String("password"), true, String("body"), String("string"), {}, String("")},
ParamSpec{String("connect_timeout_ms"), false, String("body"), String("number"), {}, String("10000")},
ParamSpec{String("retry_delay_ms"), false, String("body"), String("number"), {}, String("500")}
}, "NetworkService");
});
}
void NetworkService::handleWifiScanRequest(AsyncWebServerRequest* request) {

View File

@@ -1,11 +1,6 @@
#include "spore/services/NodeService.h"
#include "spore/core/ApiServer.h"
#include "spore/util/Logging.h"
#include "spore/types/NodeResponse.h"
using spore::types::NodeStatusResponse;
using spore::types::NodeOperationResponse;
using spore::types::NodeEndpointsResponse;
NodeService::NodeService(NodeContext& ctx, ApiServer& apiServer) : ctx(ctx), apiServer(apiServer) {}
@@ -13,7 +8,7 @@ void NodeService::registerEndpoints(ApiServer& api) {
// Status endpoint
api.addEndpoint("/api/node/status", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleStatusRequest(request); },
std::vector<ParamSpec>{}, "NodeService");
std::vector<ParamSpec>{});
// Update endpoint with file upload
api.addEndpoint("/api/node/update", HTTP_POST,
@@ -23,45 +18,72 @@ void NodeService::registerEndpoints(ApiServer& api) {
},
std::vector<ParamSpec>{
ParamSpec{String("firmware"), true, String("body"), String("file"), {}, String("")}
}, "NodeService");
});
// Restart endpoint
api.addEndpoint("/api/node/restart", HTTP_POST,
[this](AsyncWebServerRequest* request) { handleRestartRequest(request); },
std::vector<ParamSpec>{}, "NodeService");
std::vector<ParamSpec>{});
// Endpoints endpoint
api.addEndpoint("/api/node/endpoints", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleEndpointsRequest(request); },
std::vector<ParamSpec>{}, "NodeService");
std::vector<ParamSpec>{});
// Generic local event endpoint
api.addEndpoint("/api/node/event", HTTP_POST,
[this](AsyncWebServerRequest* request) {
if (!request->hasParam("event", true) || !request->hasParam("payload", true)) {
request->send(400, "application/json", "{\"error\":\"Missing 'event' or 'payload'\"}");
return;
}
String eventName = request->getParam("event", true)->value();
String payloadStr = request->getParam("payload", true)->value();
std::string ev = eventName.c_str();
ctx.fire(ev, &payloadStr);
request->send(200, "application/json", "{\"ok\":true}");
},
std::vector<ParamSpec>{
ParamSpec{String("event"), true, String("body"), String("string"), {}},
ParamSpec{String("payload"), true, String("body"), String("string"), {}}
});
}
void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
NodeStatusResponse response;
JsonDocument doc;
doc["freeHeap"] = ESP.getFreeHeap();
doc["chipId"] = ESP.getChipId();
doc["sdkVersion"] = ESP.getSdkVersion();
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
doc["flashChipSize"] = ESP.getFlashChipSize();
// Get labels from member list or self
std::map<String, String> labels;
// Include local node labels if present
if (ctx.memberList) {
auto it = ctx.memberList->find(ctx.hostname);
if (it != ctx.memberList->end()) {
labels = it->second.labels;
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : it->second.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
} else if (!ctx.self.labels.empty()) {
labels = ctx.self.labels;
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : ctx.self.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
}
response.buildCompleteResponse(labels);
request->send(200, "application/json", response.toJsonString());
String json;
serializeJson(doc, json);
request->send(200, "application/json", json);
}
void NodeService::handleUpdateRequest(AsyncWebServerRequest* request) {
bool success = !Update.hasError();
NodeOperationResponse response;
response.setSuccess(success ? "OK" : "FAIL");
AsyncWebServerResponse* httpResponse = request->beginResponse(200, "application/json", response.toJsonString());
httpResponse->addHeader("Connection", "close");
request->send(httpResponse);
AsyncWebServerResponse* response = request->beginResponse(200, "application/json",
success ? "{\"status\": \"OK\"}" : "{\"status\": \"FAIL\"}");
response->addHeader("Connection", "close");
request->send(response);
request->onDisconnect([this]() {
LOG_INFO("API", "Restart device");
delay(10);
@@ -104,12 +126,10 @@ void NodeService::handleUpdateUpload(AsyncWebServerRequest* request, const Strin
}
void NodeService::handleRestartRequest(AsyncWebServerRequest* request) {
NodeOperationResponse response;
response.setSuccess("restarting");
AsyncWebServerResponse* httpResponse = request->beginResponse(200, "application/json", response.toJsonString());
httpResponse->addHeader("Connection", "close");
request->send(httpResponse);
AsyncWebServerResponse* response = request->beginResponse(200, "application/json",
"{\"status\": \"restarting\"}");
response->addHeader("Connection", "close");
request->send(response);
request->onDisconnect([this]() {
LOG_INFO("API", "Restart device");
delay(10);
@@ -118,7 +138,36 @@ void NodeService::handleRestartRequest(AsyncWebServerRequest* request) {
}
void NodeService::handleEndpointsRequest(AsyncWebServerRequest* request) {
NodeEndpointsResponse response;
response.addEndpoints(apiServer.getEndpoints());
request->send(200, "application/json", response.toJsonString());
JsonDocument doc;
JsonArray endpointsArr = doc["endpoints"].to<JsonArray>();
// Add all registered endpoints from ApiServer
for (const auto& endpoint : apiServer.getEndpoints()) {
JsonObject obj = endpointsArr.add<JsonObject>();
obj["uri"] = endpoint.uri;
obj["method"] = ApiServer::methodToStr(endpoint.method);
if (!endpoint.params.empty()) {
JsonArray paramsArr = obj["params"].to<JsonArray>();
for (const auto& ps : endpoint.params) {
JsonObject p = paramsArr.add<JsonObject>();
p["name"] = ps.name;
p["location"] = ps.location;
p["required"] = ps.required;
p["type"] = ps.type;
if (!ps.values.empty()) {
JsonArray allowed = p["values"].to<JsonArray>();
for (const auto& v : ps.values) {
allowed.add(v);
}
}
if (ps.defaultValue.length() > 0) {
p["default"] = ps.defaultValue;
}
}
}
}
String json;
serializeJson(doc, json);
request->send(200, "application/json", json);
}

View File

@@ -1,17 +1,13 @@
#include "spore/services/TaskService.h"
#include "spore/core/ApiServer.h"
#include "spore/types/TaskResponse.h"
#include <algorithm>
using spore::types::TaskStatusResponse;
using spore::types::TaskControlResponse;
TaskService::TaskService(TaskManager& taskManager) : taskManager(taskManager) {}
void TaskService::registerEndpoints(ApiServer& api) {
api.addEndpoint("/api/tasks/status", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleStatusRequest(request); },
std::vector<ParamSpec>{}, "TaskService");
std::vector<ParamSpec>{});
api.addEndpoint("/api/tasks/control", HTTP_POST,
[this](AsyncWebServerRequest* request) { handleControlRequest(request); },
@@ -32,19 +28,36 @@ void TaskService::registerEndpoints(ApiServer& api) {
{String("enable"), String("disable"), String("start"), String("stop"), String("status")},
String("")
}
}, "TaskService");
});
}
void TaskService::handleStatusRequest(AsyncWebServerRequest* request) {
TaskStatusResponse response;
// Get task statuses using a separate document to avoid reference issues
JsonDocument scratch;
auto taskStatuses = taskManager.getAllTaskStatuses(scratch);
// Build the complete response with the task data
response.buildCompleteResponse(taskStatuses);
request->send(200, "application/json", response.toJsonString());
JsonDocument doc;
JsonObject summaryObj = doc["summary"].to<JsonObject>();
summaryObj["totalTasks"] = taskStatuses.size();
summaryObj["activeTasks"] = std::count_if(taskStatuses.begin(), taskStatuses.end(),
[](const auto& pair) { return pair.second["enabled"]; });
JsonArray tasksArr = doc["tasks"].to<JsonArray>();
for (const auto& taskPair : taskStatuses) {
JsonObject taskObj = tasksArr.add<JsonObject>();
taskObj["name"] = taskPair.first;
taskObj["interval"] = taskPair.second["interval"];
taskObj["enabled"] = taskPair.second["enabled"];
taskObj["running"] = taskPair.second["running"];
taskObj["autoStart"] = taskPair.second["autoStart"];
}
JsonObject systemObj = doc["system"].to<JsonObject>();
systemObj["freeHeap"] = ESP.getFreeHeap();
systemObj["uptime"] = millis();
String json;
serializeJson(doc, json);
request->send(200, "application/json", json);
}
void TaskService::handleControlRequest(AsyncWebServerRequest* request) {
@@ -75,27 +88,50 @@ void TaskService::handleControlRequest(AsyncWebServerRequest* request) {
success = true;
message = "Task status retrieved";
TaskControlResponse response;
response.setResponse(success, message, taskName, action);
response.addTaskDetails(taskName,
taskManager.isTaskEnabled(taskName.c_str()),
taskManager.isTaskRunning(taskName.c_str()),
taskManager.getTaskInterval(taskName.c_str()));
JsonDocument statusDoc;
statusDoc["success"] = success;
statusDoc["message"] = message;
statusDoc["task"] = taskName;
statusDoc["action"] = action;
request->send(200, "application/json", response.toJsonString());
statusDoc["taskDetails"] = JsonObject();
JsonObject taskDetails = statusDoc["taskDetails"];
taskDetails["name"] = taskName;
taskDetails["enabled"] = taskManager.isTaskEnabled(taskName.c_str());
taskDetails["running"] = taskManager.isTaskRunning(taskName.c_str());
taskDetails["interval"] = taskManager.getTaskInterval(taskName.c_str());
taskDetails["system"] = JsonObject();
JsonObject systemInfo = taskDetails["system"];
systemInfo["freeHeap"] = ESP.getFreeHeap();
systemInfo["uptime"] = millis();
String statusJson;
serializeJson(statusDoc, statusJson);
request->send(200, "application/json", statusJson);
return;
} else {
success = false;
message = "Invalid action. Use: enable, disable, start, stop, or status";
}
TaskControlResponse response;
response.setResponse(success, message, taskName, action);
request->send(success ? 200 : 400, "application/json", response.toJsonString());
JsonDocument doc;
doc["success"] = success;
doc["message"] = message;
doc["task"] = taskName;
doc["action"] = action;
String json;
serializeJson(doc, json);
request->send(success ? 200 : 400, "application/json", json);
} else {
TaskControlResponse response;
response.setError("Missing parameters. Required: task, action",
"{\"task\": \"discovery_send\", \"action\": \"status\"}");
request->send(400, "application/json", response.toJsonString());
JsonDocument doc;
doc["success"] = false;
doc["message"] = "Missing parameters. Required: task, action";
doc["example"] = "{\"task\": \"discovery_send\", \"action\": \"status\"}";
String json;
serializeJson(doc, json);
request->send(400, "application/json", json);
}
}

View File

@@ -10,10 +10,11 @@ Config::Config() {
api_server_port = 80;
// Cluster Configuration
discovery_interval_ms = 1000;
heartbeat_interval_ms = 2000;
discovery_interval_ms = 1000; // TODO retire this in favor of heartbeat_interval_ms
cluster_listen_interval_ms = 10;
heartbeat_interval_ms = 5000;
status_update_interval_ms = 1000;
member_info_update_interval_ms = 10000;
member_info_update_interval_ms = 10000; // TODO retire this in favor of heartbeat_interval_ms
print_interval_ms = 5000;
// Node Status Thresholds

1
test/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
node_modules/

View File

@@ -1,11 +1,64 @@
# Test Scripts
This directory is intended for PlatformIO Test Runner and project tests.
This directory contains JavaScript test scripts to interact with the Spore device, primarily for testing cluster event broadcasting.
Unit Testing is a software testing method by which individual units of
source code, sets of one or more MCU program modules together with associated
control data, usage procedures, and operating procedures, are tested to
determine whether they are fit for use. Unit testing finds problems early
in the development cycle.
## Prerequisites
These scripts require [Node.js](https://nodejs.org/) to be installed on your system.
## How to Run
### 1. HTTP Cluster Broadcast Color (`test/http-cluster-broadcast-color.js`)
This script sends HTTP POST requests to the `/api/cluster/event` endpoint on your Spore device. It broadcasts NeoPattern color changes across the cluster every 5 seconds.
**Usage:**
```
node test/http-cluster-broadcast-color.js <device-ip>
```
Example:
```
node test/http-cluster-broadcast-color.js 10.0.1.53
```
This will broadcast `{ event: "api/neopattern/color", data: { color: "#RRGGBB", brightness: 128 } }` every 5 seconds to the cluster via `/api/cluster/event`.
### 2. WS Local Color Setter (`test/ws-color-client.js`)
Connects to the device WebSocket (`/ws`) and sets a solid color locally (non-broadcast) every 5 seconds by firing `api/neopattern/color`.
**Usage:**
```
node test/ws-color-client.js ws://<device-ip>/ws
```
Example:
```
node test/ws-color-client.js ws://10.0.1.53/ws
```
### 3. WS Cluster Broadcast Color (`test/ws-cluster-broadcast-color.js`)
Connects to the device WebSocket (`/ws`) and broadcasts a color change to all peers every 5 seconds by firing `cluster/broadcast` with the proper envelope.
**Usage:**
```
node test/ws-cluster-broadcast-color.js ws://<device-ip>/ws
```
Example:
```
node test/ws-cluster-broadcast-color.js ws://10.0.1.53/ws
```
### 4. WS Cluster Broadcast Rainbow (`test/ws-cluster-broadcast-rainbow.js`)
Broadcasts a smooth rainbow color transition over WebSocket using `cluster/broadcast` and the `api/neopattern/color` event. Update rate defaults to `UPDATE_RATE` in the script (e.g., 100 ms).
**Usage:**
```
node test/ws-cluster-broadcast-rainbow.js ws://<device-ip>/ws
```
Example:
```
node test/ws-cluster-broadcast-rainbow.js ws://10.0.1.53/ws
```
Note: Very fast update intervals (e.g., 10 ms) may saturate links or the device.
More information about PlatformIO Unit Testing:
- https://docs.platformio.org/en/latest/advanced/unit-testing/index.html

View File

@@ -0,0 +1,52 @@
// Simple HTTP client to broadcast a neopattern color change to the cluster
// Usage: node cluster-broadcast-color.js 10.0.1.53
const http = require('http');
const host = process.argv[2] || '127.0.0.1';
const port = 80;
const colors = ['#FF0000', '#00FF00', '#0000FF', '#FFFF00', '#FF00FF', '#00FFFF'];
let idx = 0;
function postClusterEvent(event, payloadObj) {
const payload = encodeURIComponent(JSON.stringify(payloadObj));
const body = `event=${encodeURIComponent(event)}&payload=${payload}`;
const options = {
host,
port,
path: '/api/cluster/event',
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': Buffer.byteLength(body)
}
};
const req = http.request(options, (res) => {
let data = '';
res.on('data', (chunk) => (data += chunk));
res.on('end', () => {
console.log('Response:', res.statusCode, data);
});
});
req.on('error', (err) => {
console.error('Request error:', err.message);
});
req.write(body);
req.end();
}
console.log(`Broadcasting color changes to http://${host}/api/cluster/event ...`);
setInterval(() => {
const color = colors[idx % colors.length];
idx++;
const payload = { color, brightness: 80 };
console.log('Broadcasting color:', payload);
postClusterEvent('api/neopattern/color', payload);
}, 5000);

33
test/package-lock.json generated Normal file
View File

@@ -0,0 +1,33 @@
{
"name": "test",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"dependencies": {
"ws": "^8.18.3"
}
},
"node_modules/ws": {
"version": "8.18.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz",
"integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==",
"license": "MIT",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
}
}
}

10
test/package.json Normal file
View File

@@ -0,0 +1,10 @@
{
"dependencies": {
"ws": "^8.18.3"
},
"scripts": {
"pixelstream:fade-green-blue": "node pixelstream/fade-green-blue.js",
"pixelstream:bouncing-ball": "node pixelstream/bouncing-ball.js",
"pixelstream:rainbow": "node pixelstream/rainbow.js"
}
}

View File

@@ -0,0 +1,80 @@
const dgram = require('dgram');
const host = process.argv[2];
const port = parseInt(process.argv[3] || '4210', 10);
const pixels = parseInt(process.argv[4] || '64', 10);
const intervalMs = parseInt(process.argv[5] || '30', 10);
if (!host) {
console.error('Usage: node bouncing-ball.js <device-ip> [port] [pixels] [interval-ms]');
process.exit(1);
}
const socket = dgram.createSocket('udp4');
const isBroadcast = host === '255.255.255.255' || host.endsWith('.255');
let position = Math.random() * (pixels - 1);
let velocity = randomVelocity();
function randomVelocity() {
const min = 0.15;
const max = 0.4;
const sign = Math.random() < 0.5 ? -1 : 1;
return (min + Math.random() * (max - min)) * sign;
}
function rebound(sign) {
velocity = randomVelocity() * sign;
}
function mix(a, b, t) {
return a + (b - a) * t;
}
function generateFrame() {
const dt = intervalMs / 1000;
position += velocity * dt * 60; // scale velocity to 60 FPS reference
if (position < 0) {
position = -position;
rebound(1);
} else if (position > pixels - 1) {
position = (pixels - 1) - (position - (pixels - 1));
rebound(-1);
}
const activeIndex = Math.max(0, Math.min(pixels - 1, Math.round(position)));
let payload = 'RAW:';
for (let i = 0; i < pixels; i++) {
if (i === activeIndex) {
payload += 'ff8000';
continue;
}
const distance = Math.abs(i - position);
const intensity = Math.max(0, 1 - distance);
const green = Math.round(mix(20, 200, intensity)).toString(16).padStart(2, '0');
const blue = Math.round(mix(40, 255, intensity)).toString(16).padStart(2, '0');
payload += '00' + green + blue;
}
return payload;
}
function sendFrame() {
const payload = generateFrame();
const message = Buffer.from(payload, 'utf8');
socket.send(message, port, host);
}
setInterval(sendFrame, intervalMs);
if (isBroadcast) {
socket.bind(() => {
socket.setBroadcast(true);
});
}
console.log(`Streaming bouncing ball pattern to ${host}:${port} with ${pixels} pixels (interval=${intervalMs}ms)`);

View File

@@ -0,0 +1,55 @@
const dgram = require('dgram');
const host = process.argv[2];
const port = parseInt(process.argv[3] || '4210', 10);
const pixels = parseInt(process.argv[4] || '64', 10);
const speed = parseFloat(process.argv[5] || '0.5'); // cycles per second
if (!host) {
console.error('Usage: node fade-green-blue.js <device-ip> [port] [pixels] [speed-hz]');
process.exit(1);
}
const socket = dgram.createSocket('udp4');
const intervalMs = 50;
let tick = 0;
const isBroadcast = host === '255.255.255.255' || host.endsWith('.255');
function generateFrame() {
const timeSeconds = (tick * intervalMs) / 1000;
const phase = timeSeconds * speed * Math.PI * 2;
const blend = (Math.sin(phase) + 1) * 0.5; // 0..1
const green = Math.round(255 * (1 - blend));
const blue = Math.round(255 * blend);
let payload = 'RAW:';
const gHex = green.toString(16).padStart(2, '0');
const bHex = blue.toString(16).padStart(2, '0');
for (let i = 0; i < pixels; i++) {
payload += '00';
payload += gHex;
payload += bHex;
}
return payload;
}
function sendFrame() {
const payload = generateFrame();
const message = Buffer.from(payload, 'utf8');
socket.send(message, port, host);
tick += 1;
}
setInterval(sendFrame, intervalMs);
if (isBroadcast) {
socket.bind(() => {
socket.setBroadcast(true);
});
}
console.log(`Streaming green/blue fade to ${host}:${port} with ${pixels} pixels (speed=${speed}Hz)`);

View File

@@ -0,0 +1,59 @@
const dgram = require('dgram');
const host = process.argv[2];
const port = parseInt(process.argv[3] || '4210', 10);
const pixels = parseInt(process.argv[4] || '64', 10);
const intervalMs = parseInt(process.argv[5] || '30', 10);
if (!host) {
console.error('Usage: node rainbow.js <device-ip> [port] [pixels] [interval-ms]');
process.exit(1);
}
const socket = dgram.createSocket('udp4');
let offset = 0;
const isBroadcast = host === '255.255.255.255' || host.endsWith('.255');
function wheel(pos) {
pos = 255 - pos;
if (pos < 85) {
return [255 - pos * 3, 0, pos * 3];
}
if (pos < 170) {
pos -= 85;
return [0, pos * 3, 255 - pos * 3];
}
pos -= 170;
return [pos * 3, 255 - pos * 3, 0];
}
function generateFrame() {
let payload = 'RAW:';
for (let i = 0; i < pixels; i++) {
const colorIndex = (i * 256 / pixels + offset) & 255;
const [r, g, b] = wheel(colorIndex);
payload += r.toString(16).padStart(2, '0');
payload += g.toString(16).padStart(2, '0');
payload += b.toString(16).padStart(2, '0');
}
offset = (offset + 1) & 255;
return payload;
}
function sendFrame() {
const payload = generateFrame();
const message = Buffer.from(payload, 'utf8');
socket.send(message, port, host);
}
setInterval(sendFrame, intervalMs);
if (isBroadcast) {
socket.bind(() => {
socket.setBroadcast(true);
});
}
console.log(`Streaming rainbow pattern to ${host}:${port} with ${pixels} pixels (interval=${intervalMs}ms)`);

View File

@@ -0,0 +1,46 @@
// WebSocket client to broadcast neopattern color changes across the cluster
// Usage: node ws-cluster-broadcast-color.js ws://<device-ip>/ws
const WebSocket = require('ws');
const url = process.argv[2] || 'ws://127.0.0.1/ws';
const ws = new WebSocket(url);
const colors = ['#FF0000', '#00FF00', '#0000FF', '#FFFF00', '#FF00FF', '#00FFFF'];
let idx = 0;
ws.on('open', () => {
console.log('Connected to', url);
// Broadcast color change every 5 seconds via cluster/broadcast
setInterval(() => {
const color = colors[idx % colors.length];
idx++;
const payload = { color, brightness: 80 };
const envelope = {
event: 'api/neopattern/color',
data: payload // server will serialize object payloads
};
const msg = { event: 'cluster/broadcast', payload: envelope };
ws.send(JSON.stringify(msg));
console.log('Broadcasted color event', payload);
}, 5000);
});
ws.on('message', (data) => {
try {
const msg = JSON.parse(data.toString());
console.log('Received:', msg);
} catch (e) {
console.log('Received raw:', data.toString());
}
});
ws.on('error', (err) => {
console.error('WebSocket error:', err.message);
});
ws.on('close', () => {
console.log('WebSocket closed');
});

View File

@@ -0,0 +1,71 @@
// WebSocket client to broadcast smooth rainbow color changes across the cluster
// Usage: node ws-cluster-broadcast-rainbow.js ws://<device-ip>/ws
const WebSocket = require('ws');
const url = process.argv[2] || 'ws://127.0.0.1/ws';
const ws = new WebSocket(url);
function hsvToRgb(h, s, v) {
const c = v * s;
const x = c * (1 - Math.abs(((h / 60) % 2) - 1));
const m = v - c;
let r = 0, g = 0, b = 0;
if (h < 60) { r = c; g = x; b = 0; }
else if (h < 120) { r = x; g = c; b = 0; }
else if (h < 180) { r = 0; g = c; b = x; }
else if (h < 240) { r = 0; g = x; b = c; }
else if (h < 300) { r = x; g = 0; b = c; }
else { r = c; g = 0; b = x; }
const R = Math.round((r + m) * 255);
const G = Math.round((g + m) * 255);
const B = Math.round((b + m) * 255);
return { r: R, g: G, b: B };
}
function toHex({ r, g, b }) {
const h = (n) => n.toString(16).padStart(2, '0').toUpperCase();
return `#${h(r)}${h(g)}${h(b)}`;
}
let hue = 0;
const SAT = 1.0; // full saturation
const VAL = 1.0; // full value
const BRIGHTNESS = 80;
const UPDATE_RATE = 100; // ms
let timer = null;
ws.on('open', () => {
console.log('Connected to', url);
// UPDATE_RATE ms updates (10 Hz). Be aware this can saturate slow links.
timer = setInterval(() => {
const rgb = hsvToRgb(hue, SAT, VAL);
const color = toHex(rgb);
const envelope = {
event: 'api/neopattern/color',
data: { color, brightness: BRIGHTNESS }
};
const msg = { event: 'cluster/broadcast', payload: envelope };
try {
ws.send(JSON.stringify(msg));
} catch (_) {}
hue = (hue + 2) % 360; // advance hue (adjust for speed)
}, UPDATE_RATE);
});
ws.on('message', (data) => {
// Optionally throttle logs: comment out for quieter output
// console.log('WS:', data.toString());
});
ws.on('error', (err) => {
console.error('WebSocket error:', err.message);
});
ws.on('close', () => {
if (timer) clearInterval(timer);
console.log('WebSocket closed');
});

48
test/ws-color-client.js Normal file
View File

@@ -0,0 +1,48 @@
// Simple WebSocket client to test streaming API color changes
// Usage: node ws-color-client.js ws://<device-ip>/ws
const WebSocket = require('ws');
const url = process.argv[2] || 'ws://127.0.0.1/ws';
const ws = new WebSocket(url);
const colors = [
'#FF0000', // red
'#00FF00', // green
'#0000FF', // blue
'#FFFF00', // yellow
'#FF00FF', // magenta
'#00FFFF' // cyan
];
let idx = 0;
ws.on('open', () => {
console.log('Connected to', url);
// Send a message every 5 seconds to set solid color
setInterval(() => {
const color = colors[idx % colors.length];
idx++;
const payload = { color, brightness: 80 };
// Send payload as an object (server supports string or object)
const msg = { event: 'api/neopattern/color', payload };
ws.send(JSON.stringify(msg));
console.log('Sent color event', payload);
}, 5000);
});
ws.on('message', (data) => {
try {
const msg = JSON.parse(data.toString());
console.log('Received:', msg);
} catch (e) {
console.log('Received raw:', data.toString());
}
});
ws.on('error', (err) => {
console.error('WebSocket error:', err.message);
});
ws.on('close', () => {
console.log('WebSocket closed');
});