Compare commits

20 Commits

Author SHA1 Message Date
e60093c419 Merge pull request 'feature/base-firmware-size-optimization' (#18) from feature/base-firmware-size-optimization into main
Reviewed-on: #18
2025-11-04 12:18:31 +01:00
633957c95c feat: configurable PixelStream 2025-10-28 21:05:02 +01:00
ad879bfe7b feat: remove CpuUsage calculation 2025-10-27 10:38:56 +01:00
4559e13d7d fix: revert RAW message 2025-10-27 07:48:06 +01:00
682849650d Merge pull request 'feat: new cluster protocol, event naming' (#17) from feature/cluster-protocol-update into main
Reviewed-on: #17
2025-10-26 12:51:41 +01:00
0f003335b3 feat: new cluster protocol, event naming 2025-10-26 12:43:22 +01:00
eab10cffa5 Merge pull request 'refactoring/firmware-optimizations' (#16) from refactoring/firmware-optimizations into main
Reviewed-on: #16
2025-10-21 13:51:03 +02:00
7f40626187 feat: improve local node initalization 2025-10-21 11:19:12 +02:00
e796375a9f feat: memberlist optimization 2025-10-21 10:50:46 +02:00
daae29dd3f refactor: update local node 2025-10-20 21:35:08 +02:00
37a68e26d8 refactor: remove unused and obsolet stuff 2025-10-20 21:21:02 +02:00
7bd3e87271 Merge pull request 'feature/improved-cluster-forming' (#15) from feature/improved-cluster-forming into main
Reviewed-on: #15
2025-10-19 17:47:05 +02:00
0d09c5900c docs: update 2025-10-19 16:55:48 +02:00
407b651b82 feat: change event naming schema 2025-10-19 13:48:13 +02:00
23289d9f09 fix: latency calculation 2025-10-19 13:11:36 +02:00
b6ad479352 feat: calculate latency during heartbeat 2025-10-19 12:59:26 +02:00
3ed44cd00f feat: improve cluster forming; just use heartbeat to form the cluster 2025-10-19 12:50:43 +02:00
ce70830678 fix: neopattern example wdt reset 2025-10-18 13:49:39 +02:00
b404852fc7 feat: GET node config endpoint 2025-10-15 22:36:20 +02:00
7063b1ab16 feat: persistent custom labels 2025-10-15 22:23:00 +02:00
31 changed files with 1439 additions and 802 deletions

322
ctl.sh
View File

@@ -15,6 +15,10 @@ source .env
## ota all <target> - OTA update all nodes in cluster ## ota all <target> - OTA update all nodes in cluster
## cluster members - List cluster members ## cluster members - List cluster members
## node wifi <ssid> <password> [ip] - Configure WiFi on node ## node wifi <ssid> <password> [ip] - Configure WiFi on node
## node label set <key=value> [ip] - Set a label on node
## node label delete <key> [ip] - Delete a label from node
## node config get [ip] - Get node configuration
## node status [ip] - Get node status and information
## monitor - Monitor serial output ## monitor - Monitor serial output
## ##
## Examples: ## Examples:
@@ -22,6 +26,13 @@ source .env
## ./ctl.sh flash d1_mini ## ./ctl.sh flash d1_mini
## ./ctl.sh node wifi "MyNetwork" "MyPassword" ## ./ctl.sh node wifi "MyNetwork" "MyPassword"
## ./ctl.sh node wifi "MyNetwork" "MyPassword" 192.168.1.100 ## ./ctl.sh node wifi "MyNetwork" "MyPassword" 192.168.1.100
## ./ctl.sh node label set "environment=production"
## ./ctl.sh node label set "location=office" 192.168.1.100
## ./ctl.sh node label delete "environment"
## ./ctl.sh node config get
## ./ctl.sh node config get 192.168.1.100
## ./ctl.sh node status
## ./ctl.sh node status 192.168.1.100
function info { function info {
sed -n 's/^##//p' ctl.sh sed -n 's/^##//p' ctl.sh
@@ -161,6 +172,317 @@ function node {
return 1 return 1
fi fi
} }
function label {
function set {
if [ $# -lt 1 ]; then
echo "Usage: $0 node label set <key=value> [node_ip]"
echo " key=value: Label key and value in format 'key=value'"
echo " node_ip: Optional IP address (defaults to API_NODE from .env)"
return 1
fi
local key_value="$1"
local node_ip="${2:-$API_NODE}"
# Parse key=value format
if [[ ! "$key_value" =~ ^[^=]+=.+$ ]]; then
echo "Error: Label must be in format 'key=value'"
echo "Example: environment=production"
return 1
fi
local key="${key_value%%=*}"
local value="${key_value#*=}"
echo "Setting label '$key=$value' on node $node_ip..."
# First get current labels
current_labels=$(curl -s "http://$node_ip/api/node/status" | jq -r '.labels // {}')
# Add/update the new label
updated_labels=$(echo "$current_labels" | jq --arg key "$key" --arg value "$value" '. + {($key): $value}')
# Send updated labels to the node
response=$(curl -s -w "\n%{http_code}" -X POST \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "labels=$updated_labels" \
"http://$node_ip/api/node/config" 2>/dev/null || echo -e "\n000")
# Extract HTTP status code and response body
http_code=$(echo "$response" | tail -n1)
response_body=$(echo "$response" | head -n -1)
# Check if curl succeeded
if [ "$http_code" = "000" ] || [ -z "$response_body" ]; then
echo "Error: Failed to connect to node at $node_ip"
echo "Please check:"
echo " - Node is powered on and connected to network"
echo " - IP address is correct"
echo " - Node is running Spore firmware"
return 1
fi
# Check HTTP status code
if [ "$http_code" != "200" ]; then
echo "Error: HTTP $http_code - Server error"
echo "Response: $response_body"
return 1
fi
# Parse and display the response
status=$(echo "$response_body" | jq -r '.status // "unknown"')
message=$(echo "$response_body" | jq -r '.message // "No message"')
echo "Status: $status"
echo "Message: $message"
# Return appropriate exit code
if [ "$status" = "success" ]; then
echo "Label '$key=$value' set successfully!"
return 0
else
echo "Failed to set label!"
return 1
fi
}
function delete {
if [ $# -lt 1 ]; then
echo "Usage: $0 node label delete <key> [node_ip]"
echo " key: Label key to delete"
echo " node_ip: Optional IP address (defaults to API_NODE from .env)"
return 1
fi
local key="$1"
local node_ip="${2:-$API_NODE}"
echo "Deleting label '$key' from node $node_ip..."
# First get current labels
current_labels=$(curl -s "http://$node_ip/api/node/status" | jq -r '.labels // {}')
# Check if key exists
if [ "$(echo "$current_labels" | jq -r --arg key "$key" 'has($key)')" != "true" ]; then
echo "Warning: Label '$key' does not exist on node"
return 0
fi
# Remove the key
updated_labels=$(echo "$current_labels" | jq --arg key "$key" 'del(.[$key])')
# Send updated labels to the node
response=$(curl -s -w "\n%{http_code}" -X POST \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "labels=$updated_labels" \
"http://$node_ip/api/node/config" 2>/dev/null || echo -e "\n000")
# Extract HTTP status code and response body
http_code=$(echo "$response" | tail -n1)
response_body=$(echo "$response" | head -n -1)
# Check if curl succeeded
if [ "$http_code" = "000" ] || [ -z "$response_body" ]; then
echo "Error: Failed to connect to node at $node_ip"
echo "Please check:"
echo " - Node is powered on and connected to network"
echo " - IP address is correct"
echo " - Node is running Spore firmware"
return 1
fi
# Check HTTP status code
if [ "$http_code" != "200" ]; then
echo "Error: HTTP $http_code - Server error"
echo "Response: $response_body"
return 1
fi
# Parse and display the response
status=$(echo "$response_body" | jq -r '.status // "unknown"')
message=$(echo "$response_body" | jq -r '.message // "No message"')
echo "Status: $status"
echo "Message: $message"
# Return appropriate exit code
if [ "$status" = "success" ]; then
echo "Label '$key' deleted successfully!"
return 0
else
echo "Failed to delete label!"
return 1
fi
}
${@:-info}
}
function config {
function get {
local node_ip="${1:-$API_NODE}"
echo "Getting configuration for node $node_ip..."
# Get node configuration
response=$(curl -s -w "\n%{http_code}" "http://$node_ip/api/node/config" 2>/dev/null || echo -e "\n000")
# Extract HTTP status code and response body
http_code=$(echo "$response" | tail -n1)
response_body=$(echo "$response" | head -n -1)
# Check if curl succeeded
if [ "$http_code" = "000" ] || [ -z "$response_body" ]; then
echo "Error: Failed to connect to node at $node_ip"
echo "Please check:"
echo " - Node is powered on and connected to network"
echo " - IP address is correct"
echo " - Node is running Spore firmware"
return 1
fi
# Check HTTP status code
if [ "$http_code" != "200" ]; then
echo "Error: HTTP $http_code - Server error"
echo "Response: $response_body"
return 1
fi
# Parse and display the response in a nice format
echo ""
echo "=== Node Configuration ==="
echo "Node IP: $node_ip"
echo "Retrieved at: $(date)"
echo ""
# WiFi Configuration
echo "=== WiFi Configuration ==="
echo "SSID: $(echo "$response_body" | jq -r '.wifi.ssid // "N/A"')"
echo "Connect Timeout: $(echo "$response_body" | jq -r '.wifi.connect_timeout_ms // "N/A"') ms"
echo "Retry Delay: $(echo "$response_body" | jq -r '.wifi.retry_delay_ms // "N/A"') ms"
echo "Password: [HIDDEN]"
echo ""
# Network Configuration
echo "=== Network Configuration ==="
echo "UDP Port: $(echo "$response_body" | jq -r '.network.udp_port // "N/A"')"
echo "API Server Port: $(echo "$response_body" | jq -r '.network.api_server_port // "N/A"')"
echo ""
# Cluster Configuration
echo "=== Cluster Configuration ==="
echo "Heartbeat Interval: $(echo "$response_body" | jq -r '.cluster.heartbeat_interval_ms // "N/A"') ms"
echo "Cluster Listen Interval: $(echo "$response_body" | jq -r '.cluster.cluster_listen_interval_ms // "N/A"') ms"
echo "Status Update Interval: $(echo "$response_body" | jq -r '.cluster.status_update_interval_ms // "N/A"') ms"
echo ""
# Node Status Thresholds
echo "=== Node Status Thresholds ==="
echo "Active Threshold: $(echo "$response_body" | jq -r '.thresholds.node_active_threshold_ms // "N/A"') ms"
echo "Inactive Threshold: $(echo "$response_body" | jq -r '.thresholds.node_inactive_threshold_ms // "N/A"') ms"
echo "Dead Threshold: $(echo "$response_body" | jq -r '.thresholds.node_dead_threshold_ms // "N/A"') ms"
echo ""
# System Configuration
echo "=== System Configuration ==="
echo "Restart Delay: $(echo "$response_body" | jq -r '.system.restart_delay_ms // "N/A"') ms"
echo "JSON Doc Size: $(echo "$response_body" | jq -r '.system.json_doc_size // "N/A"') bytes"
echo ""
# Memory Management
echo "=== Memory Management ==="
echo "Low Memory Threshold: $(echo "$response_body" | jq -r '.memory.low_memory_threshold_bytes // "N/A"') bytes"
echo "Critical Memory Threshold: $(echo "$response_body" | jq -r '.memory.critical_memory_threshold_bytes // "N/A"') bytes"
echo "Max Concurrent HTTP Requests: $(echo "$response_body" | jq -r '.memory.max_concurrent_http_requests // "N/A"')"
echo ""
# Custom Labels
labels=$(echo "$response_body" | jq -r '.labels // {}')
if [ "$labels" != "{}" ] && [ "$labels" != "null" ]; then
echo "=== Custom Labels ==="
echo "$labels" | jq -r 'to_entries[] | "\(.key): \(.value)"'
echo ""
else
echo "=== Custom Labels ==="
echo "No custom labels set"
echo ""
fi
# Metadata
echo "=== Metadata ==="
echo "Configuration Version: $(echo "$response_body" | jq -r '.version // "N/A"')"
echo "Retrieved Timestamp: $(echo "$response_body" | jq -r '.retrieved_at // "N/A"')"
echo ""
echo "=== Raw JSON Response ==="
echo "$response_body" | jq '.'
return 0
}
${@:-info}
}
function status {
local node_ip="${1:-$API_NODE}"
echo "Getting status for node $node_ip..."
# Get node status
response=$(curl -s -w "\n%{http_code}" "http://$node_ip/api/node/status" 2>/dev/null || echo -e "\n000")
# Extract HTTP status code and response body
http_code=$(echo "$response" | tail -n1)
response_body=$(echo "$response" | head -n -1)
# Check if curl succeeded
if [ "$http_code" = "000" ] || [ -z "$response_body" ]; then
echo "Error: Failed to connect to node at $node_ip"
echo "Please check:"
echo " - Node is powered on and connected to network"
echo " - IP address is correct"
echo " - Node is running Spore firmware"
return 1
fi
# Check HTTP status code
if [ "$http_code" != "200" ]; then
echo "Error: HTTP $http_code - Server error"
echo "Response: $response_body"
return 1
fi
# Parse and display the response in a nice format
echo ""
echo "=== Node Status ==="
echo "Hostname: $(echo "$response_body" | jq -r '.hostname // "N/A"')"
echo "IP Address: $node_ip"
echo "Free Heap: $(echo "$response_body" | jq -r '.freeHeap // "N/A"') bytes"
echo "Chip ID: $(echo "$response_body" | jq -r '.chipId // "N/A"')"
echo "SDK Version: $(echo "$response_body" | jq -r '.sdkVersion // "N/A"')"
echo "CPU Frequency: $(echo "$response_body" | jq -r '.cpuFreqMHz // "N/A"') MHz"
echo "Flash Size: $(echo "$response_body" | jq -r '.flashChipSize // "N/A"') bytes"
# Display labels if present
labels=$(echo "$response_body" | jq -r '.labels // {}')
if [ "$labels" != "{}" ] && [ "$labels" != "null" ]; then
echo ""
echo "=== Labels ==="
echo "$labels" | jq -r 'to_entries[] | "\(.key): \(.value)"'
else
echo ""
echo "=== Labels ==="
echo "No labels set"
fi
echo ""
echo "=== Raw JSON Response ==="
echo "$response_body" | jq '.'
return 0
}
${@:-info} ${@:-info}
} }

View File

@@ -42,65 +42,79 @@ The cluster uses a UDP-based discovery protocol for automatic node detection:
### Discovery Process ### Discovery Process
1. **Discovery Broadcast**: Nodes periodically send UDP packets on port `udp_port` (default 4210) 1. **Discovery Broadcast**: Nodes periodically send heartbeat messages on port `udp_port` (default 4210)
2. **Response Handling**: Nodes respond with `CLUSTER_RESPONSE:<hostname>` 2. **Response Handling**: Nodes respond with node update information containing their current state
3. **Member Management**: Discovered nodes are added/updated in the cluster 3. **Member Management**: Discovered nodes are added/updated in the cluster with current information
4. **Node Info via UDP**: Heartbeat triggers peers to send `CLUSTER_NODE_INFO:<hostname>:<json>` 4. **Node Synchronization**: Periodic broadcasts ensure all nodes maintain current cluster state
### Protocol Details ### Protocol Details
- **UDP Port**: 4210 (configurable via `Config.udp_port`) - **UDP Port**: 4210 (configurable via `Config.udp_port`)
- **Discovery Message**: `CLUSTER_DISCOVERY` - **Heartbeat Message**: `CLUSTER_HEARTBEAT:hostname`
- **Response Message**: `CLUSTER_RESPONSE` - **Node Update Message**: `NODE_UPDATE:hostname:{json}`
- **Heartbeat Message**: `CLUSTER_HEARTBEAT`
- **Node Info Message**: `CLUSTER_NODE_INFO:<hostname>:<json>`
- **Broadcast Address**: 255.255.255.255 - **Broadcast Address**: 255.255.255.255
- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms)
- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms)
- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms)
### Message Formats ### Message Formats
- **Discovery**: `CLUSTER_DISCOVERY` - **Heartbeat**: `CLUSTER_HEARTBEAT:hostname`
- Sender: any node, broadcast to 255.255.255.255:`udp_port`
- Purpose: announce presence and solicit peer identification
- **Response**: `CLUSTER_RESPONSE:<hostname>`
- Sender: node receiving a discovery; unicast to requester IP
- Purpose: provide hostname so requester can register/update member
- **Heartbeat**: `CLUSTER_HEARTBEAT:<hostname>`
- Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval - Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval
- Purpose: prompt peers to reply with their node info and keep liveness - Purpose: announce presence, prompt peers for node info, and keep liveness
- **Node Info**: `CLUSTER_NODE_INFO:<hostname>:<json>` - **Node Update**: `NODE_UPDATE:hostname:{json}`
- Sender: node receiving a heartbeat; unicast to heartbeat sender IP - Sender: node responding to heartbeat or broadcasting current state
- JSON fields: freeHeap, chipId, sdkVersion, cpuFreqMHz, flashChipSize, optional labels - JSON fields: hostname, ip, uptime, optional labels
- Purpose: provide current node information for cluster synchronization
### Discovery Flow ### Discovery Flow
1. **Sender broadcasts** `CLUSTER_DISCOVERY` 1. **A node broadcasts** `CLUSTER_HEARTBEAT:hostname` to announce its presence
2. **Each receiver responds** with `CLUSTER_RESPONSE:<hostname>` to the sender IP 2. **Each receiver responds** with `NODE_UPDATE:hostname:{json}` containing current node state
3. **Sender registers/updates** the node using hostname and source IP
### Heartbeat Flow
1. **A node broadcasts** `CLUSTER_HEARTBEAT:<hostname>`
2. **Each receiver replies** with `CLUSTER_NODE_INFO:<hostname>:<json>` to the heartbeat sender IP
3. **The sender**: 3. **The sender**:
- Ensures the node exists or creates it with `hostname` and sender IP - Ensures the responding node exists or creates it with current IP and information
- Parses JSON and updates resources, labels, `status = ACTIVE`, `lastSeen = now` - Parses JSON and updates node info, `status = ACTIVE`, `lastSeen = now`
- Sets `latency = now - lastHeartbeatSentAt` (per-node, measured at heartbeat origin) - Calculates `latency = now - lastHeartbeatSentAt` for network performance monitoring
### Node Synchronization
1. **Event-driven broadcasts**: Nodes broadcast `NODE_UPDATE:hostname:{json}` when node information changes
2. **All receivers**: Update their memberlist entry for the broadcasting node
3. **Purpose**: Ensures all nodes maintain current cluster state and configuration
### Sequence Diagram
```mermaid
sequenceDiagram
participant N1 as Node A (esp-node1)
participant N2 as Node B (esp-node2)
Note over N1,N2: Discovery via heartbeat broadcast
N1->>+N2: CLUSTER_HEARTBEAT:esp-node1
Note over N2: Node B responds with its current state
N2->>+N1: NODE_UPDATE:esp-node1:{"hostname":"esp-node2","uptime":12345,"labels":{"role":"sensor"}}
Note over N1: Process NODE_UPDATE response
N1-->>N1: Update memberlist for Node B
N1-->>N1: Set Node B status = ACTIVE
N1-->>N1: Calculate latency for Node B
Note over N1,N2: Event-driven node synchronization
N1->>+N2: NODE_UPDATE:esp-node1:{"hostname":"esp-node1","uptime":12346,"labels":{"role":"controller"}}
Note over N2: Update memberlist with latest information
N2-->>N2: Update Node A info, maintain ACTIVE status
```
### Listener Behavior ### Listener Behavior
The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to: The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to:
- **Discovery** → send `CLUSTER_RESPONSE` - **Heartbeat** → add/update responding node and send `NODE_UPDATE` response
- **Heartbeat** → send `CLUSTER_NODE_INFO` JSON - **Node Update** → update node information and trigger memberlist logging
- **Response** → add/update node using provided hostname and source IP
- **Node Info** → update resources/status/labels and record latency
### Timing and Intervals ### Timing and Intervals
- **UDP Port**: `Config.udp_port` (default 4210) - **UDP Port**: `Config.udp_port` (default 4210)
- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms)
- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms)
- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms)
@@ -120,12 +134,9 @@ The system runs several background tasks at different intervals:
| Task | Interval (default) | Purpose | | Task | Interval (default) | Purpose |
|------|--------------------|---------| |------|--------------------|---------|
| `cluster_discovery` | 1000 ms | Send UDP discovery packets | | `cluster_listen` | 10 ms | Listen for heartbeat/node-info messages |
| `cluster_listen` | 10 ms | Listen for discovery/heartbeat/node-info |
| `status_update` | 1000 ms | Update node status categories, purge dead | | `status_update` | 1000 ms | Update node status categories, purge dead |
| `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources | | `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources |
| `cluster_update_members_info` | 10000 ms | Reserved; no-op (info via UDP) |
| `print_members` | 5000 ms | Log current member list |
### Task Management Features ### Task Management Features
@@ -142,12 +153,12 @@ The `NodeContext` provides an event-driven architecture for system-wide communic
```cpp ```cpp
// Subscribe to events // Subscribe to events
ctx.on("node_discovered", [](void* data) { ctx.on("node/discovered", [](void* data) {
NodeInfo* node = static_cast<NodeInfo*>(data); NodeInfo* node = static_cast<NodeInfo*>(data);
// Handle new node discovery // Handle new node discovery
}); });
ctx.on("cluster_updated", [](void* data) { ctx.on("cluster/updated", [](void* data) {
// Handle cluster membership changes // Handle cluster membership changes
}); });
``` ```
@@ -156,13 +167,13 @@ ctx.on("cluster_updated", [](void* data) {
```cpp ```cpp
// Publish events // Publish events
ctx.fire("node_discovered", &newNode); ctx.fire("node/discovered", &newNode);
ctx.fire("cluster_updated", &clusterData); ctx.fire("cluster/updated", &clusterData);
``` ```
### Available Events ### Available Events
- **`node_discovered`**: New node added or local node refreshed - **`node/discovered`**: New node added or local node refreshed
## Resource Monitoring ## Resource Monitoring

View File

@@ -2,7 +2,7 @@
## Overview ## Overview
SPORE implements a comprehensive persistent configuration system that manages device settings across reboots and provides runtime reconfiguration capabilities. The system uses LittleFS for persistent storage and provides both programmatic and HTTP API access to configuration parameters. SPORE implements a persistent configuration system that manages device settings across reboots and provides runtime reconfiguration capabilities. The system uses LittleFS for persistent storage and provides both programmatic and HTTP API access to configuration parameters.
## Configuration Architecture ## Configuration Architecture
@@ -93,12 +93,9 @@ The configuration is stored as a JSON file with the following structure:
"api_server_port": 80 "api_server_port": 80
}, },
"cluster": { "cluster": {
"discovery_interval_ms": 1000,
"heartbeat_interval_ms": 5000, "heartbeat_interval_ms": 5000,
"cluster_listen_interval_ms": 10, "cluster_listen_interval_ms": 10,
"status_update_interval_ms": 1000, "status_update_interval_ms": 1000
"member_info_update_interval_ms": 10000,
"print_interval_ms": 5000
}, },
"thresholds": { "thresholds": {
"node_active_threshold_ms": 10000, "node_active_threshold_ms": 10000,

View File

@@ -4,7 +4,6 @@
NeoPattern::NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type, void (*callback)(int)) NeoPattern::NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type, void (*callback)(int))
: Adafruit_NeoPixel(pixels, pin, type) : Adafruit_NeoPixel(pixels, pin, type)
{ {
frameBuffer = (uint8_t *)malloc(768);
OnComplete = callback; OnComplete = callback;
TotalSteps = numPixels(); TotalSteps = numPixels();
begin(); begin();
@@ -13,40 +12,21 @@ NeoPattern::NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type, void (*callba
NeoPattern::NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type) NeoPattern::NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type)
: Adafruit_NeoPixel(pixels, pin, type) : Adafruit_NeoPixel(pixels, pin, type)
{ {
frameBuffer = (uint8_t *)malloc(768);
TotalSteps = numPixels(); TotalSteps = numPixels();
begin(); begin();
} }
NeoPattern::~NeoPattern() { NeoPattern::~NeoPattern() {
if (frameBuffer) { // No frameBuffer to clean up
free(frameBuffer);
}
} }
void NeoPattern::handleStream(uint8_t *data, size_t len) // Removed unused handleStream and drawFrameBuffer functions
{
//const uint16_t *data16 = (uint16_t *)data;
bufferSize = len;
memcpy(frameBuffer, data, len);
}
void NeoPattern::drawFrameBuffer(int w, uint8_t *frame, int length)
{
for (int i = 0; i < length; i++)
{
uint8_t r = frame[i];
uint8_t g = frame[i + 1];
uint8_t b = frame[i + 2];
setPixelColor(i, r, g, b);
}
}
void NeoPattern::onCompleteDefault(int pixels) void NeoPattern::onCompleteDefault(int pixels)
{ {
//Serial.println("onCompleteDefault"); //Serial.println("onCompleteDefault");
// FIXME no specific code // FIXME no specific code
if (ActivePattern == THEATER_CHASE) if (ActivePattern == THEATER_CHASE || ActivePattern == RAINBOW_CYCLE)
{ {
return; return;
} }
@@ -127,7 +107,12 @@ void NeoPattern::RainbowCycleUpdate()
setPixelColor(i, Wheel(((i * 256 / numPixels()) + Index) & 255)); setPixelColor(i, Wheel(((i * 256 / numPixels()) + Index) & 255));
} }
show(); show();
Increment(); // RainbowCycle is continuous, just increment Index
Index++;
if (Index >= 255)
{
Index = 0;
}
} }
// Initialize for a Theater Chase // Initialize for a Theater Chase

View File

@@ -52,17 +52,12 @@ class NeoPattern : public Adafruit_NeoPixel
// Callback on completion of pattern // Callback on completion of pattern
void (*OnComplete)(int); void (*OnComplete)(int);
uint8_t *frameBuffer;
int bufferSize = 0;
// Constructor - calls base-class constructor to initialize strip // Constructor - calls base-class constructor to initialize strip
NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type, void (*callback)(int)); NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type, void (*callback)(int));
NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type); NeoPattern(uint16_t pixels, uint8_t pin, uint8_t type);
~NeoPattern(); ~NeoPattern();
// Stream handling // Stream handling functions removed
void handleStream(uint8_t *data, size_t len);
void drawFrameBuffer(int w, uint8_t *frame, int length);
// Pattern completion // Pattern completion
void onCompleteDefault(int pixels); void onCompleteDefault(int pixels);

View File

@@ -195,13 +195,13 @@ void NeoPatternService::registerEventHandlers() {
JsonDocument doc; JsonDocument doc;
DeserializationError err = deserializeJson(doc, *jsonStr); DeserializationError err = deserializeJson(doc, *jsonStr);
if (err) { if (err) {
LOG_WARN("NeoPattern", String("Failed to parse CLUSTER_EVENT data: ") + err.c_str()); LOG_WARN("NeoPattern", String("Failed to parse cluster/event data: ") + err.c_str());
return; return;
} }
JsonObject obj = doc.as<JsonObject>(); JsonObject obj = doc.as<JsonObject>();
bool applied = applyControlParams(obj); bool applied = applyControlParams(obj);
if (applied) { if (applied) {
LOG_INFO("NeoPattern", "Applied control from CLUSTER_EVENT"); LOG_INFO("NeoPattern", "Applied control from cluster/event");
} }
}); });
@@ -340,8 +340,30 @@ void NeoPatternService::setPattern(NeoPatternType pattern) {
neoPattern->ActivePattern = static_cast<::pattern>(pattern); neoPattern->ActivePattern = static_cast<::pattern>(pattern);
resetStateForPattern(pattern); resetStateForPattern(pattern);
// Initialize the pattern using the registry // Set up pattern-specific parameters
patternRegistry.initializePattern(static_cast<uint8_t>(pattern)); switch (pattern) {
case NeoPatternType::RAINBOW_CYCLE:
neoPattern->RainbowCycle(updateIntervalMs, static_cast<::direction>(direction));
break;
case NeoPatternType::THEATER_CHASE:
neoPattern->TheaterChase(currentState.color, currentState.color2, updateIntervalMs, static_cast<::direction>(direction));
break;
case NeoPatternType::COLOR_WIPE:
neoPattern->ColorWipe(currentState.color, updateIntervalMs, static_cast<::direction>(direction));
break;
case NeoPatternType::SCANNER:
neoPattern->Scanner(currentState.color, updateIntervalMs);
break;
case NeoPatternType::FADE:
neoPattern->Fade(currentState.color, currentState.color2, currentState.totalSteps, updateIntervalMs, static_cast<::direction>(direction));
break;
case NeoPatternType::FIRE:
// Fire pattern doesn't need setup
break;
case NeoPatternType::NONE:
// None pattern doesn't need setup
break;
}
} }
void NeoPatternService::setPatternByName(const String& name) { void NeoPatternService::setPatternByName(const String& name) {
@@ -425,7 +447,7 @@ void NeoPatternService::registerPatterns() {
"rainbow_cycle", "rainbow_cycle",
static_cast<uint8_t>(NeoPatternType::RAINBOW_CYCLE), static_cast<uint8_t>(NeoPatternType::RAINBOW_CYCLE),
"Rainbow cycle pattern", "Rainbow cycle pattern",
[this]() { neoPattern->RainbowCycle(updateIntervalMs, static_cast<::direction>(direction)); }, nullptr, // No initializer needed, state is set up in setPattern
[this]() { updateRainbowCycle(); }, [this]() { updateRainbowCycle(); },
false, // doesn't require color2 false, // doesn't require color2
true // supports direction true // supports direction
@@ -435,7 +457,7 @@ void NeoPatternService::registerPatterns() {
"theater_chase", "theater_chase",
static_cast<uint8_t>(NeoPatternType::THEATER_CHASE), static_cast<uint8_t>(NeoPatternType::THEATER_CHASE),
"Theater chase pattern", "Theater chase pattern",
[this]() { neoPattern->TheaterChase(currentState.color, currentState.color2, updateIntervalMs, static_cast<::direction>(direction)); }, nullptr, // No initializer needed, state is set up in setPattern
[this]() { updateTheaterChase(); }, [this]() { updateTheaterChase(); },
true, // requires color2 true, // requires color2
true // supports direction true // supports direction
@@ -445,7 +467,7 @@ void NeoPatternService::registerPatterns() {
"color_wipe", "color_wipe",
static_cast<uint8_t>(NeoPatternType::COLOR_WIPE), static_cast<uint8_t>(NeoPatternType::COLOR_WIPE),
"Color wipe pattern", "Color wipe pattern",
[this]() { neoPattern->ColorWipe(currentState.color, updateIntervalMs, static_cast<::direction>(direction)); }, nullptr, // No initializer needed, state is set up in setPattern
[this]() { updateColorWipe(); }, [this]() { updateColorWipe(); },
false, // doesn't require color2 false, // doesn't require color2
true // supports direction true // supports direction
@@ -455,7 +477,7 @@ void NeoPatternService::registerPatterns() {
"scanner", "scanner",
static_cast<uint8_t>(NeoPatternType::SCANNER), static_cast<uint8_t>(NeoPatternType::SCANNER),
"Scanner pattern", "Scanner pattern",
[this]() { neoPattern->Scanner(currentState.color, updateIntervalMs); }, nullptr, // No initializer needed, state is set up in setPattern
[this]() { updateScanner(); }, [this]() { updateScanner(); },
false, // doesn't require color2 false, // doesn't require color2
false // doesn't support direction false // doesn't support direction
@@ -465,7 +487,7 @@ void NeoPatternService::registerPatterns() {
"fade", "fade",
static_cast<uint8_t>(NeoPatternType::FADE), static_cast<uint8_t>(NeoPatternType::FADE),
"Fade pattern", "Fade pattern",
[this]() { neoPattern->Fade(currentState.color, currentState.color2, currentState.totalSteps, updateIntervalMs, static_cast<::direction>(direction)); }, nullptr, // No initializer needed, state is set up in setPattern
[this]() { updateFade(); }, [this]() { updateFade(); },
true, // requires color2 true, // requires color2
true // supports direction true // supports direction
@@ -475,7 +497,7 @@ void NeoPatternService::registerPatterns() {
"fire", "fire",
static_cast<uint8_t>(NeoPatternType::FIRE), static_cast<uint8_t>(NeoPatternType::FIRE),
"Fire effect pattern", "Fire effect pattern",
[this]() { neoPattern->Fire(50, 120); }, nullptr, // No initializer needed, state is set up in setPattern
[this]() { updateFire(); }, [this]() { updateFire(); },
false, // doesn't require color2 false, // doesn't require color2
false // doesn't support direction false // doesn't support direction
@@ -499,7 +521,7 @@ void NeoPatternService::resetStateForPattern(NeoPatternType pattern) {
neoPattern->Index = 0; neoPattern->Index = 0;
neoPattern->Direction = static_cast<::direction>(direction); neoPattern->Direction = static_cast<::direction>(direction);
neoPattern->completed = 0; neoPattern->completed = 0;
lastUpdateMs = 0; // Don't reset lastUpdateMs to 0, keep the current timing
} }
uint32_t NeoPatternService::parseColor(const String& colorStr) const { uint32_t NeoPatternService::parseColor(const String& colorStr) const {
@@ -538,9 +560,9 @@ String NeoPatternService::getPatternDescription(const String& name) const {
void NeoPatternService::update() { void NeoPatternService::update() {
if (!initialized) return; if (!initialized) return;
//unsigned long now = millis(); unsigned long now = millis();
//if (now - lastUpdateMs < updateIntervalMs) return; if (now - lastUpdateMs < updateIntervalMs) return;
//lastUpdateMs = now; lastUpdateMs = now;
// Use pattern registry to execute the current pattern // Use pattern registry to execute the current pattern
patternRegistry.executePattern(static_cast<uint8_t>(activePattern)); patternRegistry.executePattern(static_cast<uint8_t>(activePattern));

View File

@@ -0,0 +1,202 @@
#include "PixelStreamService.h"
#include "spore/util/Logging.h"
#include <Adafruit_NeoPixel.h>
PixelStreamService::PixelStreamService(NodeContext& ctx, ApiServer& apiServer, PixelStreamController* controller)
: ctx(ctx), apiServer(apiServer), controller(controller) {}
void PixelStreamService::registerEndpoints(ApiServer& api) {
// Config endpoint for setting pixelstream configuration
api.registerEndpoint("/api/pixelstream/config", HTTP_PUT,
[this](AsyncWebServerRequest* request) { handleConfigRequest(request); },
std::vector<ParamSpec>{
ParamSpec{String("pin"), false, String("body"), String("number"), {}, String("")},
ParamSpec{String("pixel_count"), false, String("body"), String("number"), {}, String("")},
ParamSpec{String("brightness"), false, String("body"), String("number"), {}, String("")},
ParamSpec{String("matrix_width"), false, String("body"), String("number"), {}, String("")},
ParamSpec{String("matrix_serpentine"), false, String("body"), String("boolean"), {}, String("")},
ParamSpec{String("pixel_type"), false, String("body"), String("number"), {}, String("")}
});
// Config endpoint for getting pixelstream configuration
api.registerEndpoint("/api/pixelstream/config", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleGetConfigRequest(request); },
std::vector<ParamSpec>{});
}
void PixelStreamService::registerTasks(TaskManager& taskManager) {
// PixelStreamService doesn't register any tasks itself
}
PixelStreamConfig PixelStreamService::loadConfig() {
// Initialize with proper defaults
PixelStreamConfig config;
config.pin = 2;
config.pixelCount = 16;
config.brightness = 80;
config.matrixWidth = 16;
config.matrixSerpentine = false;
config.pixelType = NEO_GRB + NEO_KHZ800;
if (!LittleFS.begin()) {
LOG_WARN("PixelStream", "Failed to initialize LittleFS, using defaults");
return config;
}
if (!LittleFS.exists(CONFIG_FILE())) {
LOG_INFO("PixelStream", "No pixelstream config file found, using defaults");
// Save defaults
saveConfig(config);
return config;
}
File file = LittleFS.open(CONFIG_FILE(), "r");
if (!file) {
LOG_ERROR("PixelStream", "Failed to open config file for reading");
return config;
}
JsonDocument doc;
DeserializationError error = deserializeJson(doc, file);
file.close();
if (error) {
LOG_ERROR("PixelStream", "Failed to parse config file: " + String(error.c_str()));
return config;
}
if (doc["pin"].is<uint8_t>()) config.pin = doc["pin"].as<uint8_t>();
if (doc["pixel_count"].is<uint16_t>()) config.pixelCount = doc["pixel_count"].as<uint16_t>();
if (doc["brightness"].is<uint8_t>()) config.brightness = doc["brightness"].as<uint8_t>();
if (doc["matrix_width"].is<uint16_t>()) config.matrixWidth = doc["matrix_width"].as<uint16_t>();
if (doc["matrix_serpentine"].is<bool>()) config.matrixSerpentine = doc["matrix_serpentine"].as<bool>();
if (doc["pixel_type"].is<uint8_t>()) config.pixelType = static_cast<neoPixelType>(doc["pixel_type"].as<uint8_t>());
LOG_INFO("PixelStream", "Configuration loaded from " + String(CONFIG_FILE()));
return config;
}
bool PixelStreamService::saveConfig(const PixelStreamConfig& config) {
if (!LittleFS.begin()) {
LOG_ERROR("PixelStream", "LittleFS not initialized, cannot save config");
return false;
}
File file = LittleFS.open(CONFIG_FILE(), "w");
if (!file) {
LOG_ERROR("PixelStream", "Failed to open config file for writing");
return false;
}
JsonDocument doc;
doc["pin"] = config.pin;
doc["pixel_count"] = config.pixelCount;
doc["brightness"] = config.brightness;
doc["matrix_width"] = config.matrixWidth;
doc["matrix_serpentine"] = config.matrixSerpentine;
doc["pixel_type"] = static_cast<uint8_t>(config.pixelType);
size_t bytesWritten = serializeJson(doc, file);
file.close();
if (bytesWritten > 0) {
LOG_INFO("PixelStream", "Configuration saved to " + String(CONFIG_FILE()) + " (" + String(bytesWritten) + " bytes)");
return true;
} else {
LOG_ERROR("PixelStream", "Failed to write configuration to file");
return false;
}
}
void PixelStreamService::handleConfigRequest(AsyncWebServerRequest* request) {
// Load current config from file
PixelStreamConfig config = loadConfig();
bool updated = false;
// Handle individual form parameters
if (request->hasParam("pin", true)) {
String pinStr = request->getParam("pin", true)->value();
if (pinStr.length() > 0) {
int pinValue = pinStr.toInt();
if (pinValue >= 0 && pinValue <= 255) {
config.pin = static_cast<uint8_t>(pinValue);
updated = true;
}
}
}
if (request->hasParam("pixel_count", true)) {
String countStr = request->getParam("pixel_count", true)->value();
if (countStr.length() > 0) {
int countValue = countStr.toInt();
if (countValue > 0 && countValue <= 65535) {
config.pixelCount = static_cast<uint16_t>(countValue);
updated = true;
}
}
}
if (request->hasParam("brightness", true)) {
String brightnessStr = request->getParam("brightness", true)->value();
if (brightnessStr.length() > 0) {
int brightnessValue = brightnessStr.toInt();
if (brightnessValue >= 0 && brightnessValue <= 255) {
config.brightness = static_cast<uint8_t>(brightnessValue);
updated = true;
}
}
}
if (request->hasParam("matrix_width", true)) {
String widthStr = request->getParam("matrix_width", true)->value();
if (widthStr.length() > 0) {
int widthValue = widthStr.toInt();
if (widthValue > 0 && widthValue <= 65535) {
config.matrixWidth = static_cast<uint16_t>(widthValue);
updated = true;
}
}
}
if (request->hasParam("matrix_serpentine", true)) {
String serpentineStr = request->getParam("matrix_serpentine", true)->value();
config.matrixSerpentine = (serpentineStr.equalsIgnoreCase("true") || serpentineStr == "1");
updated = true;
}
if (request->hasParam("pixel_type", true)) {
String typeStr = request->getParam("pixel_type", true)->value();
if (typeStr.length() > 0) {
int typeValue = typeStr.toInt();
config.pixelType = static_cast<neoPixelType>(typeValue);
updated = true;
}
}
if (!updated) {
request->send(400, "application/json", "{\"error\":\"No valid configuration fields provided\"}");
return;
}
// Save config to file
if (saveConfig(config)) {
LOG_INFO("PixelStreamService", "Configuration updated and saved to pixelstream.json");
request->send(200, "application/json", "{\"status\":\"success\",\"message\":\"Configuration updated and saved\"}");
} else {
LOG_ERROR("PixelStreamService", "Failed to save configuration to file");
request->send(500, "application/json", "{\"error\":\"Failed to save configuration\"}");
}
}
void PixelStreamService::handleGetConfigRequest(AsyncWebServerRequest* request) {
PixelStreamConfig config = loadConfig();
JsonDocument doc;
doc["pin"] = config.pin;
doc["pixel_count"] = config.pixelCount;
doc["brightness"] = config.brightness;
doc["matrix_width"] = config.matrixWidth;
doc["matrix_serpentine"] = config.matrixSerpentine;
doc["pixel_type"] = static_cast<uint8_t>(config.pixelType);
String json;
serializeJson(doc, json);
request->send(200, "application/json", json);
}

View File

@@ -0,0 +1,33 @@
#pragma once
#include "spore/Service.h"
#include "spore/core/NodeContext.h"
#include "PixelStreamController.h"
#include <ArduinoJson.h>
#include <LittleFS.h>
#include "spore/util/Logging.h"
// PixelStreamConfig is defined in PixelStreamController.h
class PixelStreamService : public Service {
public:
PixelStreamService(NodeContext& ctx, ApiServer& apiServer, PixelStreamController* controller);
void registerEndpoints(ApiServer& api) override;
void registerTasks(TaskManager& taskManager) override;
const char* getName() const override { return "PixelStream"; }
// Config management
PixelStreamConfig loadConfig();
bool saveConfig(const PixelStreamConfig& config);
void setController(PixelStreamController* ctrl) { controller = ctrl; }
private:
NodeContext& ctx;
ApiServer& apiServer;
PixelStreamController* controller;
void handleConfigRequest(AsyncWebServerRequest* request);
void handleGetConfigRequest(AsyncWebServerRequest* request);
static const char* CONFIG_FILE() { return "/pixelstream.json"; }
};

View File

@@ -2,7 +2,11 @@
#include "spore/Spore.h" #include "spore/Spore.h"
#include "spore/util/Logging.h" #include "spore/util/Logging.h"
#include "PixelStreamController.h" #include "PixelStreamController.h"
#include "PixelStreamService.h"
#include <Adafruit_NeoPixel.h>
// Defaults are now loaded from config.json on LittleFS
// Can still be overridden with preprocessor defines if needed
#ifndef PIXEL_PIN #ifndef PIXEL_PIN
#define PIXEL_PIN 2 #define PIXEL_PIN 2
#endif #endif
@@ -34,22 +38,28 @@ Spore spore({
}); });
PixelStreamController* controller = nullptr; PixelStreamController* controller = nullptr;
PixelStreamService* service = nullptr;
void setup() { void setup() {
spore.setup(); spore.setup();
PixelStreamConfig config{ // Create service first (need it to load config)
static_cast<uint8_t>(PIXEL_PIN), service = new PixelStreamService(spore.getContext(), spore.getApiServer(), nullptr);
static_cast<uint16_t>(PIXEL_COUNT),
static_cast<uint8_t>(PIXEL_BRIGHTNESS),
static_cast<uint16_t>(PIXEL_MATRIX_WIDTH),
static_cast<bool>(PIXEL_MATRIX_SERPENTINE),
static_cast<neoPixelType>(PIXEL_TYPE)
};
// Load pixelstream config from LittleFS (pixelstream.json) or use defaults
PixelStreamConfig config = service->loadConfig();
// Create controller with loaded config
controller = new PixelStreamController(spore.getContext(), config); controller = new PixelStreamController(spore.getContext(), config);
controller->begin(); controller->begin();
// Update service with the actual controller
service->setController(controller);
// Register service
spore.registerService(service);
// Start the API server
spore.begin(); spore.begin();
} }

View File

@@ -11,7 +11,6 @@
#include "core/TaskManager.h" #include "core/TaskManager.h"
#include "Service.h" #include "Service.h"
#include "util/Logging.h" #include "util/Logging.h"
#include "util/CpuUsage.h"
class Spore { class Spore {
public: public:
@@ -35,12 +34,6 @@ public:
ClusterManager& getCluster() { return cluster; } ClusterManager& getCluster() { return cluster; }
ApiServer& getApiServer() { return apiServer; } ApiServer& getApiServer() { return apiServer; }
// CPU usage monitoring
CpuUsage& getCpuUsage() { return cpuUsage; }
float getCurrentCpuUsage() const { return cpuUsage.getCpuUsage(); }
float getAverageCpuUsage() const { return cpuUsage.getAverageCpuUsage(); }
private: private:
void initializeCore(); void initializeCore();
void registerCoreServices(); void registerCoreServices();
@@ -51,7 +44,6 @@ private:
TaskManager taskManager; TaskManager taskManager;
ClusterManager cluster; ClusterManager cluster;
ApiServer apiServer; ApiServer apiServer;
CpuUsage cpuUsage;
std::vector<std::shared_ptr<Service>> services; std::vector<std::shared_ptr<Service>> services;
bool initialized; bool initialized;

View File

@@ -14,17 +14,16 @@ class ClusterManager {
public: public:
ClusterManager(NodeContext& ctx, TaskManager& taskMgr); ClusterManager(NodeContext& ctx, TaskManager& taskMgr);
void registerTasks(); void registerTasks();
void sendDiscovery();
void listen(); void listen();
void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP); void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP);
void updateAllNodeStatuses(); void updateAllNodeStatuses();
void removeDeadNodes(); void removeDeadNodes();
void printMemberList(); void printMemberList();
const std::map<String, NodeInfo>& getMemberList() const { return *ctx.memberList; } size_t getMemberCount() const { return ctx.memberList->getMemberCount(); }
void fetchNodeInfo(const IPAddress& ip); void updateLocalNodeResources(NodeInfo& node);
void updateLocalNodeResources();
void heartbeatTaskCallback(); void heartbeatTaskCallback();
void updateAllMembersInfoTaskCallback(); void updateAllMembersInfoTaskCallback();
void broadcastNodeUpdate();
private: private:
NodeContext& ctx; NodeContext& ctx;
TaskManager& taskManager; TaskManager& taskManager;
@@ -35,18 +34,15 @@ private:
}; };
void initMessageHandlers(); void initMessageHandlers();
void handleIncomingMessage(const char* incoming); void handleIncomingMessage(const char* incoming);
static bool isDiscoveryMsg(const char* msg);
static bool isHeartbeatMsg(const char* msg); static bool isHeartbeatMsg(const char* msg);
static bool isResponseMsg(const char* msg); static bool isNodeUpdateMsg(const char* msg);
static bool isNodeInfoMsg(const char* msg);
static bool isClusterEventMsg(const char* msg); static bool isClusterEventMsg(const char* msg);
static bool isRawMsg(const char* msg); static bool isRawMsg(const char* msg);
void onDiscovery(const char* msg);
void onHeartbeat(const char* msg); void onHeartbeat(const char* msg);
void onResponse(const char* msg); void onNodeUpdate(const char* msg);
void onNodeInfo(const char* msg);
void onClusterEvent(const char* msg); void onClusterEvent(const char* msg);
void onRawMessage(const char* msg); void onRawMessage(const char* msg);
void sendNodeInfo(const String& hostname, const IPAddress& targetIP);
unsigned long lastHeartbeatSentAt = 0; unsigned long lastHeartbeatSentAt = 0;
std::vector<MessageHandler> messageHandlers; std::vector<MessageHandler> messageHandlers;
}; };

View File

@@ -0,0 +1,133 @@
#pragma once
#include <Arduino.h>
#include <map>
#include <string>
#include <optional>
#include <functional>
#include "spore/types/NodeInfo.h"
/**
* @brief Manages the list of cluster members.
*
* The Memberlist class maintains a collection of cluster members, where each member
* is identified by its IP address and associated with a NodeInfo object. It provides
* methods to add, update, and remove members, as well as handle node status changes
* (stale and dead nodes).
*/
class Memberlist {
public:
/**
* @brief Default constructor.
*/
Memberlist();
/**
* @brief Destructor.
*/
~Memberlist();
/**
* @brief Adds or updates a member in the list.
*
* If the member already exists, updates its information. Otherwise, adds a new member.
* @param ip The IP address of the member (as string).
* @param node The NodeInfo object containing member details.
* @return True if the member was added or updated, false otherwise.
*/
bool addOrUpdateMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Adds a new member to the list.
*
* @param ip The IP address of the member (as string).
* @param node The NodeInfo object containing member details.
* @return True if the member was added, false if it already exists.
*/
bool addMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Updates an existing member in the list.
*
* @param ip The IP address of the member (as string).
* @param node The updated NodeInfo object.
* @return True if the member was updated, false if it doesn't exist.
*/
bool updateMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Removes a member from the list.
*
* @param ip The IP address of the member to remove (as string).
* @return True if the member was removed, false if it doesn't exist.
*/
bool removeMember(const std::string& ip);
/**
* @brief Retrieves a member by IP address.
*
* @param ip The IP address of the member (as string).
* @return Optional containing the NodeInfo if found, or std::nullopt if not found.
*/
std::optional<NodeInfo> getMember(const std::string& ip) const;
/**
* @brief Iterates over all members and calls the provided callback for each.
*
* @param callback Function to call for each member. Receives (ip, node) as parameters.
*/
void forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const;
/**
* @brief Iterates over all members and calls the provided callback for each.
*
* @param callback Function to call for each member. Receives (ip, node) as parameters.
* If callback returns false, iteration stops.
* @return True if all members were processed, false if iteration was stopped early.
*/
bool forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const;
/**
* @brief Gets the number of members in the list.
*
* @return The number of members.
*/
size_t getMemberCount() const;
/**
* @brief Updates the status of all members based on current time and thresholds.
*
* Marks nodes as stale or dead based on their last seen time.
* @param currentTime The current time in milliseconds.
* @param staleThresholdMs Threshold for marking a node as stale (milliseconds).
* @param deadThresholdMs Threshold for marking a node as dead (milliseconds).
* @param onStatusChange Optional callback fired when a node's status changes.
*/
void updateAllNodeStatuses(unsigned long currentTime,
unsigned long staleThresholdMs,
unsigned long deadThresholdMs,
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange = nullptr);
/**
* @brief Removes all dead members from the list.
*
* @return The number of members removed.
*/
size_t removeDeadMembers();
/**
* @brief Checks if a member exists in the list.
*
* @param ip The IP address of the member (as string).
* @return True if the member exists, false otherwise.
*/
bool hasMember(const std::string& ip) const;
/**
* @brief Clears all members from the list.
*/
void clear();
private:
std::map<std::string, NodeInfo> m_members; ///< Internal map holding the members.
};

View File

@@ -2,12 +2,14 @@
#include <WiFiUdp.h> #include <WiFiUdp.h>
#include <map> #include <map>
#include "spore/types/NodeInfo.h"
#include <functional> #include <functional>
#include <string> #include <string>
#include <initializer_list> #include <initializer_list>
#include <memory>
#include "spore/types/NodeInfo.h"
#include "spore/types/Config.h" #include "spore/types/Config.h"
#include "spore/types/ApiTypes.h" #include "spore/types/ApiTypes.h"
#include "spore/core/Memberlist.h"
class NodeContext { class NodeContext {
public: public:
@@ -18,8 +20,9 @@ public:
String hostname; String hostname;
IPAddress localIP; IPAddress localIP;
NodeInfo self; NodeInfo self;
std::map<String, NodeInfo>* memberList; std::unique_ptr<Memberlist> memberList;
::Config config; ::Config config;
std::map<String, String> constructorLabels; // Labels passed to constructor (not persisted)
using EventCallback = std::function<void(void*)>; using EventCallback = std::function<void(void*)>;
std::map<std::string, std::vector<EventCallback>> eventRegistry; std::map<std::string, std::vector<EventCallback>> eventRegistry;
@@ -29,4 +32,5 @@ public:
void on(const std::string& event, EventCallback cb); void on(const std::string& event, EventCallback cb);
void fire(const std::string& event, void* data); void fire(const std::string& event, void* data);
void onAny(AnyEventCallback cb); void onAny(AnyEventCallback cb);
void rebuildLabels(); // Rebuild self.labels from constructorLabels + config.labels
}; };

View File

@@ -5,11 +5,9 @@
// Cluster protocol and API constants // Cluster protocol and API constants
namespace ClusterProtocol { namespace ClusterProtocol {
constexpr const char* DISCOVERY_MSG = "CLUSTER_DISCOVERY"; constexpr const char* HEARTBEAT_MSG = "cluster/heartbeat";
constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE"; constexpr const char* NODE_UPDATE_MSG = "node/update";
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT"; constexpr const char* CLUSTER_EVENT_MSG = "cluster/event";
constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO";
constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT";
constexpr const char* RAW_MSG = "RAW"; constexpr const char* RAW_MSG = "RAW";
constexpr uint16_t UDP_PORT = 4210; constexpr uint16_t UDP_PORT = 4210;
// Increased buffer to accommodate larger RAW pixel streams and node info JSON over UDP // Increased buffer to accommodate larger RAW pixel streams and node info JSON over UDP
@@ -18,12 +16,9 @@ namespace ClusterProtocol {
} }
namespace TaskIntervals { namespace TaskIntervals {
constexpr unsigned long SEND_DISCOVERY = 1000;
constexpr unsigned long LISTEN_FOR_DISCOVERY = 100;
constexpr unsigned long UPDATE_STATUS = 1000; constexpr unsigned long UPDATE_STATUS = 1000;
constexpr unsigned long PRINT_MEMBER_LIST = 5000; constexpr unsigned long PRINT_MEMBER_LIST = 5000;
constexpr unsigned long HEARTBEAT = 2000; constexpr unsigned long HEARTBEAT = 2000;
constexpr unsigned long UPDATE_ALL_MEMBERS_INFO = 10000;
} }
constexpr unsigned long NODE_ACTIVE_THRESHOLD = 10000; constexpr unsigned long NODE_ACTIVE_THRESHOLD = 10000;

View File

@@ -1,11 +1,10 @@
#pragma once #pragma once
#include "spore/Service.h" #include "spore/Service.h"
#include "spore/util/CpuUsage.h"
#include <functional> #include <functional>
class MonitoringService : public Service { class MonitoringService : public Service {
public: public:
MonitoringService(CpuUsage& cpuUsage); MonitoringService();
void registerEndpoints(ApiServer& api) override; void registerEndpoints(ApiServer& api) override;
void registerTasks(TaskManager& taskManager) override; void registerTasks(TaskManager& taskManager) override;
const char* getName() const override { return "Monitoring"; } const char* getName() const override { return "Monitoring"; }
@@ -15,17 +14,12 @@ public:
// CPU information // CPU information
float currentCpuUsage; float currentCpuUsage;
float averageCpuUsage; float averageCpuUsage;
float maxCpuUsage;
float minCpuUsage;
unsigned long measurementCount; unsigned long measurementCount;
bool isMeasuring; bool isMeasuring;
// Memory information // Memory information
size_t freeHeap; size_t freeHeap;
size_t totalHeap; size_t totalHeap;
size_t minFreeHeap;
size_t maxAllocHeap;
size_t heapFragmentation;
// Filesystem information // Filesystem information
size_t totalBytes; size_t totalBytes;
@@ -45,8 +39,5 @@ private:
void handleResourcesRequest(AsyncWebServerRequest* request); void handleResourcesRequest(AsyncWebServerRequest* request);
// Helper methods // Helper methods
size_t calculateHeapFragmentation() const;
void getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const; void getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const;
CpuUsage& cpuUsage;
}; };

View File

@@ -20,4 +20,6 @@ private:
void handleUpdateUpload(AsyncWebServerRequest* request, const String& filename, size_t index, uint8_t* data, size_t len, bool final); void handleUpdateUpload(AsyncWebServerRequest* request, const String& filename, size_t index, uint8_t* data, size_t len, bool final);
void handleRestartRequest(AsyncWebServerRequest* request); void handleRestartRequest(AsyncWebServerRequest* request);
void handleEndpointsRequest(AsyncWebServerRequest* request); void handleEndpointsRequest(AsyncWebServerRequest* request);
void handleConfigRequest(AsyncWebServerRequest* request);
void handleGetConfigRequest(AsyncWebServerRequest* request);
}; };

View File

@@ -3,6 +3,7 @@
#include <Arduino.h> #include <Arduino.h>
#include <LittleFS.h> #include <LittleFS.h>
#include <ArduinoJson.h> #include <ArduinoJson.h>
#include <map>
class Config { class Config {
public: public:
@@ -11,11 +12,10 @@ public:
static constexpr const char* DEFAULT_WIFI_PASSWORD = "th3r31sn0sp00n"; static constexpr const char* DEFAULT_WIFI_PASSWORD = "th3r31sn0sp00n";
static constexpr uint16_t DEFAULT_UDP_PORT = 4210; static constexpr uint16_t DEFAULT_UDP_PORT = 4210;
static constexpr uint16_t DEFAULT_API_SERVER_PORT = 80; static constexpr uint16_t DEFAULT_API_SERVER_PORT = 80;
static constexpr unsigned long DEFAULT_DISCOVERY_INTERVAL_MS = 1000;
static constexpr unsigned long DEFAULT_CLUSTER_LISTEN_INTERVAL_MS = 10; static constexpr unsigned long DEFAULT_CLUSTER_LISTEN_INTERVAL_MS = 10;
static constexpr unsigned long DEFAULT_HEARTBEAT_INTERVAL_MS = 5000; static constexpr unsigned long DEFAULT_HEARTBEAT_INTERVAL_MS = 5000;
static constexpr unsigned long DEFAULT_STATUS_UPDATE_INTERVAL_MS = 1000; static constexpr unsigned long DEFAULT_STATUS_UPDATE_INTERVAL_MS = 1000;
static constexpr unsigned long DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS = 10000; static constexpr unsigned long DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS = 5000;
static constexpr unsigned long DEFAULT_PRINT_INTERVAL_MS = 5000; static constexpr unsigned long DEFAULT_PRINT_INTERVAL_MS = 5000;
static constexpr unsigned long DEFAULT_NODE_ACTIVE_THRESHOLD_MS = 10000; static constexpr unsigned long DEFAULT_NODE_ACTIVE_THRESHOLD_MS = 10000;
static constexpr unsigned long DEFAULT_NODE_INACTIVE_THRESHOLD_MS = 60000; static constexpr unsigned long DEFAULT_NODE_INACTIVE_THRESHOLD_MS = 60000;
@@ -37,12 +37,9 @@ public:
uint16_t api_server_port; uint16_t api_server_port;
// Cluster Configuration // Cluster Configuration
unsigned long discovery_interval_ms;
unsigned long heartbeat_interval_ms; unsigned long heartbeat_interval_ms;
unsigned long cluster_listen_interval_ms; unsigned long cluster_listen_interval_ms;
unsigned long status_update_interval_ms; unsigned long status_update_interval_ms;
unsigned long member_info_update_interval_ms;
unsigned long print_interval_ms;
// Node Status Thresholds // Node Status Thresholds
unsigned long node_active_threshold_ms; unsigned long node_active_threshold_ms;
@@ -62,6 +59,9 @@ public:
uint32_t critical_memory_threshold_bytes; uint32_t critical_memory_threshold_bytes;
size_t max_concurrent_http_requests; size_t max_concurrent_http_requests;
// Custom Labels
std::map<String, String> labels;
// Constructor // Constructor
Config(); Config();

View File

@@ -9,6 +9,7 @@ struct NodeInfo {
String hostname; String hostname;
IPAddress ip; IPAddress ip;
unsigned long lastSeen; unsigned long lastSeen;
unsigned long uptime = 0; // milliseconds since node started
enum Status { ACTIVE, INACTIVE, DEAD } status; enum Status { ACTIVE, INACTIVE, DEAD } status;
struct Resources { struct Resources {
uint32_t freeHeap = 0; uint32_t freeHeap = 0;
@@ -17,7 +18,7 @@ struct NodeInfo {
uint32_t cpuFreqMHz = 0; uint32_t cpuFreqMHz = 0;
uint32_t flashChipSize = 0; uint32_t flashChipSize = 0;
} resources; } resources;
unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt unsigned long latency = 0; // ms from heartbeat broadcast to NODE_UPDATE receipt
std::vector<EndpointInfo> endpoints; // List of registered endpoints std::vector<EndpointInfo> endpoints; // List of registered endpoints
std::map<String, String> labels; // Arbitrary node labels (key -> value) std::map<String, String> labels; // Arbitrary node labels (key -> value)
}; };

View File

@@ -1,118 +0,0 @@
#pragma once
#include <Arduino.h>
/**
* @brief CPU usage measurement utility for ESP32/ESP8266
*
* This class provides methods to measure CPU usage by tracking idle time
* and calculating the percentage of time the CPU is busy vs idle.
*/
class CpuUsage {
public:
/**
* @brief Construct a new CpuUsage object
*/
CpuUsage();
/**
* @brief Destructor
*/
~CpuUsage() = default;
/**
* @brief Initialize the CPU usage measurement
* Call this once during setup
*/
void begin();
/**
* @brief Start measuring CPU usage for the current cycle
* Call this at the beginning of your main loop
*/
void startMeasurement();
/**
* @brief End measuring CPU usage for the current cycle
* Call this at the end of your main loop
*/
void endMeasurement();
/**
* @brief Get the current CPU usage percentage
* @return float CPU usage percentage (0.0 to 100.0)
*/
float getCpuUsage() const;
/**
* @brief Get the average CPU usage over the measurement window
* @return float Average CPU usage percentage (0.0 to 100.0)
*/
float getAverageCpuUsage() const;
/**
* @brief Get the maximum CPU usage recorded
* @return float Maximum CPU usage percentage (0.0 to 100.0)
*/
float getMaxCpuUsage() const;
/**
* @brief Get the minimum CPU usage recorded
* @return float Minimum CPU usage percentage (0.0 to 100.0)
*/
float getMinCpuUsage() const;
/**
* @brief Reset all CPU usage statistics
*/
void reset();
/**
* @brief Check if measurement is currently active
* @return true if measurement is active, false otherwise
*/
bool isMeasuring() const;
/**
* @brief Get the number of measurements taken
* @return unsigned long Number of measurements
*/
unsigned long getMeasurementCount() const;
private:
// Measurement state
bool _initialized;
bool _measuring;
unsigned long _measurementCount;
// Timing variables
unsigned long _cycleStartTime;
unsigned long _idleStartTime;
unsigned long _totalIdleTime;
unsigned long _totalCycleTime;
// Statistics
float _currentCpuUsage;
float _averageCpuUsage;
float _maxCpuUsage;
float _minCpuUsage;
unsigned long _totalCpuTime;
// Rolling average window
static constexpr size_t ROLLING_WINDOW_SIZE = 10;
float _rollingWindow[ROLLING_WINDOW_SIZE];
size_t _rollingIndex;
bool _rollingWindowFull;
/**
* @brief Update rolling average calculation
* @param value New value to add to rolling average
*/
void updateRollingAverage(float value);
/**
* @brief Update min/max statistics
* @param value New value to check against min/max
*/
void updateMinMax(float value);
};

View File

@@ -18,6 +18,15 @@ monitor_speed = 115200
lib_deps = lib_deps =
esp32async/ESPAsyncWebServer@^3.8.0 esp32async/ESPAsyncWebServer@^3.8.0
bblanchon/ArduinoJson@^7.4.2 bblanchon/ArduinoJson@^7.4.2
build_flags =
-Os ; Optimize for size
-ffunction-sections ; Place each function in its own section
-fdata-sections ; Place data in separate sections
-Wl,--gc-sections ; Remove unused sections at link time
-DNDEBUG ; Disable debug assertions
-DVTABLES_IN_FLASH ; Move virtual tables to flash
-fno-exceptions ; Disable C++ exceptions
-fno-rtti ; Disable runtime type information
[env:base] [env:base]
platform = platformio/espressif8266@^4.2.1 platform = platformio/espressif8266@^4.2.1
@@ -31,6 +40,7 @@ board_build.filesystem = littlefs
; note: somehow partition table is not working, so we need to use the ldscript ; note: somehow partition table is not working, so we need to use the ldscript
board_build.ldscript = eagle.flash.1m64.ld ; 64KB -> FS Size board_build.ldscript = eagle.flash.1m64.ld ; 64KB -> FS Size
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/base/*.cpp> +<examples/base/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -51,6 +61,7 @@ board_build.flash_mode = dio ; D1 Mini uses DIO on 4 Mbit flash
board_build.flash_size = 4M board_build.flash_size = 4M
board_build.ldscript = eagle.flash.4m1m.ld board_build.ldscript = eagle.flash.4m1m.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/base/*.cpp> +<examples/base/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -71,6 +82,7 @@ board_build.flash_mode = dout
board_build.ldscript = eagle.flash.1m64.ld board_build.ldscript = eagle.flash.1m64.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
;data_dir = examples/relay/data ;data_dir = examples/relay/data
build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/relay/*.cpp> +<examples/relay/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -91,7 +103,7 @@ board_build.flash_mode = dout
board_build.ldscript = eagle.flash.1m64.ld board_build.ldscript = eagle.flash.1m64.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1 adafruit/Adafruit NeoPixel@^1.15.1
build_flags = -DLED_STRIP_PIN=2 build_flags = -DLED_STRIP_PIN=2 ;${common.build_flags}
build_src_filter = build_src_filter =
+<examples/neopattern/*.cpp> +<examples/neopattern/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -112,7 +124,7 @@ board_build.flash_mode = dout
board_build.ldscript = eagle.flash.1m64.ld board_build.ldscript = eagle.flash.1m64.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1 adafruit/Adafruit NeoPixel@^1.15.1
build_flags = build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/pixelstream/*.cpp> +<examples/pixelstream/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -133,7 +145,7 @@ board_build.flash_mode = dout
board_build.ldscript = eagle.flash.4m1m.ld board_build.ldscript = eagle.flash.4m1m.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1 adafruit/Adafruit NeoPixel@^1.15.1
build_flags = -DPIXEL_PIN=TX -DPIXEL_COUNT=256 -DMATRIX_WIDTH=16 build_flags = -DPIXEL_PIN=TX -DPIXEL_COUNT=256 -DMATRIX_WIDTH=16 ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/pixelstream/*.cpp> +<examples/pixelstream/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -156,6 +168,7 @@ board_build.ldscript = eagle.flash.4m1m.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1 adafruit/Adafruit NeoPixel@^1.15.1
dfrobot/DFRobotDFPlayerMini@^1.0.6 dfrobot/DFRobotDFPlayerMini@^1.0.6
build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/multimatrix/*.cpp> +<examples/multimatrix/*.cpp>
+<examples/pixelstream/PixelStreamController.cpp> +<examples/pixelstream/PixelStreamController.cpp>

View File

@@ -9,13 +9,19 @@
Spore::Spore() : ctx(), network(ctx), taskManager(ctx), cluster(ctx, taskManager), Spore::Spore() : ctx(), network(ctx), taskManager(ctx), cluster(ctx, taskManager),
apiServer(ctx, taskManager, ctx.config.api_server_port), apiServer(ctx, taskManager, ctx.config.api_server_port),
cpuUsage(), initialized(false), apiServerStarted(false) { initialized(false), apiServerStarted(false) {
// Rebuild labels from constructor + config labels
ctx.rebuildLabels();
} }
Spore::Spore(std::initializer_list<std::pair<String, String>> initialLabels) Spore::Spore(std::initializer_list<std::pair<String, String>> initialLabels)
: ctx(initialLabels), network(ctx), taskManager(ctx), cluster(ctx, taskManager), : ctx(initialLabels), network(ctx), taskManager(ctx), cluster(ctx, taskManager),
apiServer(ctx, taskManager, ctx.config.api_server_port), apiServer(ctx, taskManager, ctx.config.api_server_port),
cpuUsage(), initialized(false), apiServerStarted(false) { initialized(false), apiServerStarted(false) {
// Rebuild labels from constructor + config labels (config takes precedence)
ctx.rebuildLabels();
} }
Spore::~Spore() { Spore::~Spore() {
@@ -34,9 +40,6 @@ void Spore::setup() {
// Initialize core components // Initialize core components
initializeCore(); initializeCore();
// Initialize CPU usage monitoring
cpuUsage.begin();
// Register core services // Register core services
registerCoreServices(); registerCoreServices();
@@ -72,15 +75,9 @@ void Spore::loop() {
return; return;
} }
// Start CPU usage measurement
cpuUsage.startMeasurement();
// Execute main tasks // Execute main tasks
taskManager.execute(); taskManager.execute();
// End CPU usage measurement before yield
cpuUsage.endMeasurement();
// Yield to allow other tasks to run // Yield to allow other tasks to run
yield(); yield();
} }
@@ -135,7 +132,7 @@ void Spore::registerCoreServices() {
auto clusterService = std::make_shared<ClusterService>(ctx); auto clusterService = std::make_shared<ClusterService>(ctx);
auto taskService = std::make_shared<TaskService>(taskManager); auto taskService = std::make_shared<TaskService>(taskManager);
auto staticFileService = std::make_shared<StaticFileService>(ctx, apiServer); auto staticFileService = std::make_shared<StaticFileService>(ctx, apiServer);
auto monitoringService = std::make_shared<MonitoringService>(cpuUsage); auto monitoringService = std::make_shared<MonitoringService>();
// Add to services list // Add to services list
services.push_back(nodeService); services.push_back(nodeService);

View File

@@ -23,11 +23,12 @@ void ApiServer::registerEndpoint(const String& uri, int method,
endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true}); endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
// Update cluster if needed // Update cluster if needed
if (ctx.memberList && !ctx.memberList->empty()) { String localIPStr = ctx.localIP.toString();
auto it = ctx.memberList->find(ctx.hostname); auto member = ctx.memberList->getMember(localIPStr.c_str());
if (it != ctx.memberList->end()) { if (member) {
it->second.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true}); NodeInfo updatedNode = *member;
} updatedNode.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
} }
} }

View File

@@ -1,14 +1,15 @@
#include "spore/core/ClusterManager.h" #include "spore/core/ClusterManager.h"
#include "spore/internal/Globals.h" #include "spore/internal/Globals.h"
#include "spore/util/Logging.h" #include "spore/util/Logging.h"
#include "spore/types/NodeInfo.h"
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) { ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
// Register callback for node_discovered event // Register callback for node/discovered event - this fires when network is ready
ctx.on("node_discovered", [this](void* data) { ctx.on("node/discovered", [this](void* data) {
NodeInfo* node = static_cast<NodeInfo*>(data); NodeInfo* node = static_cast<NodeInfo*>(data);
this->addOrUpdateNode(node->hostname, node->ip); this->addOrUpdateNode(node->hostname, node->ip);
}); });
// Centralized broadcast handler: services fire 'cluster/broadcast' with CLUSTER_EVENT JSON payload // Centralized broadcast handler: services fire 'cluster/broadcast' with cluster/event JSON payload
ctx.on("cluster/broadcast", [this](void* data) { ctx.on("cluster/broadcast", [this](void* data) {
String* jsonStr = static_cast<String*>(data); String* jsonStr = static_cast<String*>(data);
if (!jsonStr) { if (!jsonStr) {
@@ -19,33 +20,36 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
IPAddress ip = WiFi.localIP(); IPAddress ip = WiFi.localIP();
IPAddress mask = WiFi.subnetMask(); IPAddress mask = WiFi.subnetMask();
IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]); IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]);
LOG_DEBUG("Cluster", String("Broadcasting CLUSTER_EVENT to ") + bcast.toString() + " len=" + String(jsonStr->length())); LOG_DEBUG("Cluster", String("Broadcasting cluster/event to ") + bcast.toString() + " len=" + String(jsonStr->length()));
this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port); this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port);
String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr; String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr;
this->ctx.udp->write(msg.c_str()); this->ctx.udp->write(msg.c_str());
this->ctx.udp->endPacket(); this->ctx.udp->endPacket();
}); });
// Handler for node update broadcasts: services fire 'cluster/node/update' when their node info changes
ctx.on("cluster/node/update", [this](void* data) {
// Trigger immediate NODE_UPDATE broadcast when node info changes
broadcastNodeUpdate();
});
// Handler for memberlist changes: print memberlist when it changes
ctx.on("cluster/memberlist/changed", [this](void* data) {
printMemberList();
});
// Register tasks // Register tasks
registerTasks(); registerTasks();
initMessageHandlers(); initMessageHandlers();
} }
void ClusterManager::registerTasks() { void ClusterManager::registerTasks() {
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); }); taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
LOG_INFO("ClusterManager", "Registered all cluster tasks"); LOG_INFO("ClusterManager", "Registered all cluster tasks");
} }
void ClusterManager::sendDiscovery() { // Discovery functionality removed - using heartbeat-only approach
//LOG_DEBUG(ctx, "Cluster", "Sending discovery packet...");
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
ctx.udp->write(ClusterProtocol::DISCOVERY_MSG);
ctx.udp->endPacket();
}
void ClusterManager::listen() { void ClusterManager::listen() {
int packetSize = ctx.udp->parsePacket(); int packetSize = ctx.udp->parsePacket();
@@ -69,10 +73,8 @@ void ClusterManager::listen() {
void ClusterManager::initMessageHandlers() { void ClusterManager::initMessageHandlers() {
messageHandlers.clear(); messageHandlers.clear();
messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" }); messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" });
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" }); messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" }); messageHandlers.push_back({ &ClusterManager::isNodeUpdateMsg, [this](const char* msg){ this->onNodeUpdate(msg); }, "NODE_UPDATE" });
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" }); messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" });
} }
@@ -94,20 +96,12 @@ void ClusterManager::handleIncomingMessage(const char* incoming) {
LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head); LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head);
} }
bool ClusterManager::isDiscoveryMsg(const char* msg) {
return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
}
bool ClusterManager::isHeartbeatMsg(const char* msg) { bool ClusterManager::isHeartbeatMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0; return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
} }
bool ClusterManager::isResponseMsg(const char* msg) { bool ClusterManager::isNodeUpdateMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0; return strncmp(msg, ClusterProtocol::NODE_UPDATE_MSG, strlen(ClusterProtocol::NODE_UPDATE_MSG)) == 0;
}
bool ClusterManager::isNodeInfoMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
} }
bool ClusterManager::isClusterEventMsg(const char* msg) { bool ClusterManager::isClusterEventMsg(const char* msg) {
@@ -123,101 +117,186 @@ bool ClusterManager::isRawMsg(const char* msg) {
return msg[prefixLen] == ':'; return msg[prefixLen] == ':';
} }
void ClusterManager::onDiscovery(const char* /*msg*/) { // Discovery functionality removed - using heartbeat-only approach
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; void ClusterManager::onHeartbeat(const char* msg) {
ctx.udp->write(response.c_str()); // Extract hostname from heartbeat message: "cluster/heartbeat:hostname"
ctx.udp->endPacket(); const char* colon = strchr(msg, ':');
if (!colon) {
LOG_WARN("Cluster", "Invalid heartbeat message format");
return;
} }
void ClusterManager::onHeartbeat(const char* /*msg*/) { String hostname = String(colon + 1);
IPAddress senderIP = ctx.udp->remoteIP();
// Update memberlist with the heartbeat
addOrUpdateNode(hostname, senderIP);
// Respond with minimal node info (hostname, ip, uptime, labels)
sendNodeInfo(hostname, senderIP);
}
void ClusterManager::onNodeUpdate(const char* msg) {
// Message format: "node/update:hostname:{json}"
const char* firstColon = strchr(msg, ':');
if (!firstColon) {
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
return;
}
const char* secondColon = strchr(firstColon + 1, ':');
if (!secondColon) {
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
return;
}
String hostnamePart = String(firstColon + 1);
String hostname = hostnamePart.substring(0, secondColon - firstColon - 1);
const char* jsonCStr = secondColon + 1;
JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonCStr);
if (err) {
LOG_WARN("Cluster", String("Failed to parse NODE_UPDATE JSON from ") + ctx.udp->remoteIP().toString());
return;
}
// The NODE_UPDATE contains info about the target node (hostname from message)
// but is sent FROM the responding node (ctx.udp->remoteIP())
// We need to find the responding node in the memberlist, not the target node
IPAddress respondingNodeIP = ctx.udp->remoteIP();
String respondingIPStr = respondingNodeIP.toString();
// Find the responding node by IP address
auto respondingMember = ctx.memberList->getMember(respondingIPStr.c_str());
if (!respondingMember) {
LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString());
return;
}
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
unsigned long latency = 0;
unsigned long now = millis();
if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window
latency = now - lastHeartbeatSentAt;
lastHeartbeatSentAt = 0; // Reset for next calculation
}
// Create updated node info
NodeInfo updatedNode = *respondingMember;
bool hostnameChanged = false;
bool labelsChanged = false;
// Update hostname if provided
if (doc["hostname"].is<const char*>()) {
String newHostname = doc["hostname"].as<const char*>();
if (updatedNode.hostname != newHostname) {
updatedNode.hostname = newHostname;
hostnameChanged = true;
}
}
// Update uptime if provided
if (doc["uptime"].is<unsigned long>()) {
updatedNode.uptime = doc["uptime"];
}
// Update labels if provided
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
std::map<String, String> newLabels;
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
newLabels[key] = String(value);
}
// Compare with existing labels
if (newLabels != updatedNode.labels) {
labelsChanged = true;
updatedNode.labels = newLabels;
}
}
// Update timing and status
updatedNode.lastSeen = now;
updatedNode.status = NodeInfo::ACTIVE;
// Update latency if we calculated it (preserve existing value if not)
if (latency > 0) {
updatedNode.latency = latency;
}
// Persist the updated node info to the memberlist
ctx.memberList->updateMember(respondingIPStr.c_str(), updatedNode);
// Check if any fields changed that require broadcasting
bool nodeInfoChanged = hostnameChanged || labelsChanged;
if (nodeInfoChanged) {
// Fire cluster/node/update event to trigger broadcast
ctx.fire("cluster/node/update", nullptr);
}
LOG_DEBUG("Cluster", String("Updated responding node ") + updatedNode.hostname + " @ " + respondingNodeIP.toString() +
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
}
void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) {
JsonDocument doc; JsonDocument doc;
if (ctx.memberList) { // Get our node info for the response (we're the responding node)
auto it = ctx.memberList->find(ctx.hostname); String localIPStr = ctx.localIP.toString();
if (it != ctx.memberList->end()) { auto member = ctx.memberList->getMember(localIPStr.c_str());
if (member) {
const NodeInfo& node = *member;
// Response contains info about ourselves (the responding node)
doc["hostname"] = node.hostname;
doc["ip"] = node.ip.toString();
doc["uptime"] = node.uptime;
// Add labels if present
if (!node.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>(); JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : it->second.labels) { for (const auto& kv : node.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
} else if (!ctx.self.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : ctx.self.labels) {
labelsObj[kv.first.c_str()] = kv.second; labelsObj[kv.first.c_str()] = kv.second;
} }
} }
} else {
// Fallback to basic info if not in memberlist
doc["hostname"] = ctx.hostname;
doc["ip"] = ctx.localIP.toString();
doc["uptime"] = millis();
} }
String json; String json;
serializeJson(doc, json); serializeJson(doc, json);
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); // Send NODE_UPDATE:targetHostname:{json about responding node}
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json; ctx.udp->beginPacket(targetIP, ctx.config.udp_port);
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + targetHostname + ":" + json;
ctx.udp->write(msg.c_str()); ctx.udp->write(msg.c_str());
ctx.udp->endPacket(); ctx.udp->endPacket();
}
void ClusterManager::onResponse(const char* msg) { LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + targetHostname + " @ " + targetIP.toString());
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
String nodeHost = String(hostPtr);
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
}
void ClusterManager::onNodeInfo(const char* msg) {
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
char* hostEnd = strchr(p, ':');
if (hostEnd) {
*hostEnd = '\0';
const char* hostCStr = p;
const char* jsonCStr = hostEnd + 1;
String nodeHost = String(hostCStr);
IPAddress senderIP = ctx.udp->remoteIP();
addOrUpdateNode(nodeHost, senderIP);
JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonCStr);
if (!err) {
auto& memberList = *ctx.memberList;
auto it = memberList.find(nodeHost);
if (it != memberList.end()) {
NodeInfo& node = it->second;
node.status = NodeInfo::ACTIVE;
unsigned long now = millis();
node.lastSeen = now;
if (lastHeartbeatSentAt != 0) {
node.latency = now - lastHeartbeatSentAt;
}
node.labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
node.labels[key] = value;
}
}
}
} else {
LOG_WARN("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
}
}
} }
void ClusterManager::onClusterEvent(const char* msg) { void ClusterManager::onClusterEvent(const char* msg) {
// Message format: CLUSTER_EVENT:{"event":"...","data":"<json string>"} // Message format: cluster/event:{"event":"...","data":"<json string>"}
const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':' const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':'
if (*jsonStart == '\0') { if (*jsonStart == '\0') {
LOG_DEBUG("Cluster", "CLUSTER_EVENT received with empty payload"); LOG_DEBUG("Cluster", "cluster/event received with empty payload");
return; return;
} }
LOG_DEBUG("Cluster", String("CLUSTER_EVENT raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart))); LOG_DEBUG("Cluster", String("cluster/event raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart)));
JsonDocument doc; JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonStart); DeserializationError err = deserializeJson(doc, jsonStart);
if (err) { if (err) {
LOG_ERROR("Cluster", String("Failed to parse CLUSTER_EVENT JSON from ") + ctx.udp->remoteIP().toString()); LOG_ERROR("Cluster", String("Failed to parse cluster/event JSON from ") + ctx.udp->remoteIP().toString());
return; return;
} }
// Robust extraction of event and data // Robust extraction of event and data
@@ -241,7 +320,7 @@ void ClusterManager::onClusterEvent(const char* msg) {
if (eventStr.length() == 0 || data.length() == 0) { if (eventStr.length() == 0 || data.length() == 0) {
String dbg; String dbg;
serializeJson(doc, dbg); serializeJson(doc, dbg);
LOG_WARN("Cluster", String("CLUSTER_EVENT missing 'event' or 'data' | payload=") + dbg); LOG_WARN("Cluster", String("cluster/event missing 'event' or 'data' | payload=") + dbg);
return; return;
} }
@@ -268,154 +347,67 @@ void ClusterManager::onRawMessage(const char* msg) {
} }
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
auto& memberList = *ctx.memberList; bool memberlistChanged = false;
String ipStr = nodeIP.toString();
// O(1) lookup instead of O(n) search // Check if member exists
auto it = memberList.find(nodeHost); auto existingMember = ctx.memberList->getMember(ipStr.c_str());
if (it != memberList.end()) { if (existingMember) {
// Update existing node // Update existing node - preserve all existing field values
it->second.ip = nodeIP; NodeInfo updatedNode = *existingMember;
it->second.lastSeen = millis(); if (updatedNode.ip != nodeIP) {
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task updatedNode.ip = nodeIP;
return; memberlistChanged = true;
} }
updatedNode.lastSeen = millis();
ctx.memberList->updateMember(ipStr.c_str(), updatedNode);
} else {
// Add new node // Add new node
NodeInfo newNode; NodeInfo newNode;
newNode.hostname = nodeHost; newNode.hostname = nodeHost;
newNode.ip = nodeIP; newNode.ip = nodeIP;
newNode.lastSeen = millis(); newNode.lastSeen = millis();
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
memberList[nodeHost] = newNode;
// Initialize static resources if this is the local node being added for the first time
if (nodeIP == ctx.localIP && nodeHost == ctx.hostname) {
newNode.resources.chipId = ESP.getChipId();
newNode.resources.sdkVersion = String(ESP.getSdkVersion());
newNode.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
newNode.resources.flashChipSize = ESP.getFlashChipSize();
LOG_DEBUG("Cluster", "Initialized static resources for local node");
}
ctx.memberList->addMember(ipStr.c_str(), newNode);
memberlistChanged = true;
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
} }
// unused http client to fetch complete node info // Fire event if memberlist changed
void ClusterManager::fetchNodeInfo(const IPAddress& ip) { if (memberlistChanged) {
if(ip == ctx.localIP) { ctx.fire("cluster/memberlist/changed", nullptr);
LOG_DEBUG("Cluster", "Skipping fetch for local node");
return;
}
unsigned long requestStart = millis();
HTTPClient http;
WiFiClient client;
String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS;
// Use RAII pattern to ensure http.end() is always called
bool httpInitialized = false;
bool success = false;
httpInitialized = http.begin(client, url);
if (!httpInitialized) {
LOG_ERROR("Cluster", "Failed to initialize HTTP client for " + ip.toString());
return;
}
// Set timeout to prevent hanging
http.setTimeout(5000); // 5 second timeout
int httpCode = http.GET();
unsigned long requestEnd = millis();
unsigned long requestDuration = requestEnd - requestStart;
if (httpCode == 200) {
String payload = http.getString();
// Use stack-allocated JsonDocument with proper cleanup
JsonDocument doc;
DeserializationError err = deserializeJson(doc, payload);
if (!err) {
auto& memberList = *ctx.memberList;
// Still need to iterate since we're searching by IP, not hostname
for (auto& pair : memberList) {
NodeInfo& node = pair.second;
if (node.ip == ip) {
// Update resources efficiently
node.resources.freeHeap = doc["freeHeap"];
node.resources.chipId = doc["chipId"];
node.resources.sdkVersion = (const char*)doc["sdkVersion"];
node.resources.cpuFreqMHz = doc["cpuFreqMHz"];
node.resources.flashChipSize = doc["flashChipSize"];
node.status = NodeInfo::ACTIVE;
node.latency = requestDuration;
node.lastSeen = millis();
// Clear and rebuild endpoints efficiently
node.endpoints.clear();
node.endpoints.reserve(10); // Pre-allocate to avoid reallocations
if (doc["api"].is<JsonArray>()) {
JsonArray apiArr = doc["api"].as<JsonArray>();
for (JsonObject apiObj : apiArr) {
// Use const char* to avoid String copies
const char* uri = apiObj["uri"];
int method = apiObj["method"];
// Create basic EndpointInfo without params for cluster nodes
EndpointInfo endpoint;
endpoint.uri = uri; // String assignment is more efficient than construction
endpoint.method = method;
endpoint.isLocal = false;
endpoint.serviceName = "remote";
node.endpoints.push_back(std::move(endpoint));
}
}
// Parse labels efficiently
node.labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) {
// Use const char* to avoid String copies
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
node.labels[key] = value;
}
}
LOG_DEBUG("Cluster", "Fetched info for node: " + node.hostname + " @ " + ip.toString());
success = true;
break;
}
}
} else {
LOG_ERROR("Cluster", "JSON parse error for node @ " + ip.toString() + ": " + String(err.c_str()));
}
} else {
LOG_ERROR("Cluster", "Failed to fetch info for node @ " + ip.toString() + ", HTTP code: " + String(httpCode));
}
// Always ensure HTTP client is properly closed
if (httpInitialized) {
http.end();
}
// Log success/failure for debugging
if (!success) {
LOG_DEBUG("Cluster", "Failed to update node info for " + ip.toString());
} }
} }
void ClusterManager::heartbeatTaskCallback() { void ClusterManager::heartbeatTaskCallback() {
auto& memberList = *ctx.memberList; // Update local node resources and lastSeen since we're actively sending heartbeats
auto it = memberList.find(ctx.hostname); String localIPStr = ctx.localIP.toString();
if (it != memberList.end()) { auto member = ctx.memberList->getMember(localIPStr.c_str());
NodeInfo& node = it->second; if (member) {
node.lastSeen = millis(); NodeInfo node = *member;
node.status = NodeInfo::ACTIVE; updateLocalNodeResources(node);
updateLocalNodeResources(); node.lastSeen = millis(); // Update lastSeen since we're actively participating
addOrUpdateNode(ctx.hostname, ctx.localIP); ctx.memberList->updateMember(localIPStr.c_str(), node);
} }
// Broadcast heartbeat so peers can respond with their node info // Broadcast heartbeat - peers will respond with NODE_UPDATE
lastHeartbeatSentAt = millis(); lastHeartbeatSentAt = millis();
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname; String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
ctx.udp->write(hb.c_str()); ctx.udp->write(hb.c_str());
ctx.udp->endPacket(); ctx.udp->endPacket();
LOG_DEBUG("Cluster", String("Sent heartbeat: ") + ctx.hostname);
} }
void ClusterManager::updateAllMembersInfoTaskCallback() { void ClusterManager::updateAllMembersInfoTaskCallback() {
@@ -423,55 +415,74 @@ void ClusterManager::updateAllMembersInfoTaskCallback() {
// No-op to reduce network and memory usage // No-op to reduce network and memory usage
} }
void ClusterManager::updateAllNodeStatuses() { void ClusterManager::broadcastNodeUpdate() {
auto& memberList = *ctx.memberList; // Broadcast our current node info as NODE_UPDATE to all cluster members
unsigned long now = millis(); String localIPStr = ctx.localIP.toString();
for (auto& pair : memberList) { auto member = ctx.memberList->getMember(localIPStr.c_str());
NodeInfo& node = pair.second; if (!member) {
updateNodeStatus(node, now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); return;
} }
const NodeInfo& node = *member;
JsonDocument doc;
doc["hostname"] = node.hostname;
doc["uptime"] = node.uptime;
// Add labels if present
if (!node.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : node.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
String json;
serializeJson(doc, json);
// Broadcast to all cluster members
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + ctx.hostname + ":" + json;
ctx.udp->write(msg.c_str());
ctx.udp->endPacket();
LOG_DEBUG("Cluster", String("Broadcasted NODE_UPDATE for ") + ctx.hostname);
}
void ClusterManager::updateAllNodeStatuses() {
unsigned long now = millis();
ctx.memberList->updateAllNodeStatuses(now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
} }
void ClusterManager::removeDeadNodes() { void ClusterManager::removeDeadNodes() {
auto& memberList = *ctx.memberList; size_t removedCount = ctx.memberList->removeDeadMembers();
unsigned long now = millis(); if (removedCount > 0) {
LOG_INFO("Cluster", String("Removed ") + removedCount + " dead nodes");
// Use iterator to safely remove elements from map ctx.fire("cluster/memberlist/changed", nullptr);
for (auto it = memberList.begin(); it != memberList.end(); ) {
unsigned long diff = now - it->second.lastSeen;
if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) {
LOG_INFO("Cluster", "Removing node: " + it->second.hostname);
it = memberList.erase(it);
} else {
++it;
}
} }
} }
void ClusterManager::printMemberList() { void ClusterManager::printMemberList() {
auto& memberList = *ctx.memberList; size_t count = ctx.memberList->getMemberCount();
if (memberList.empty()) { if (count == 0) {
LOG_INFO("Cluster", "Member List: empty"); LOG_INFO("Cluster", "Member List: empty");
return; return;
} }
LOG_INFO("Cluster", "Member List:"); LOG_INFO("Cluster", "Member List:");
for (const auto& pair : memberList) { ctx.memberList->forEachMember([](const std::string& ip, const NodeInfo& node) {
const NodeInfo& node = pair.second;
LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen)); LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen));
} });
} }
void ClusterManager::updateLocalNodeResources() { void ClusterManager::updateLocalNodeResources(NodeInfo& node) {
auto& memberList = *ctx.memberList; // Update node status and timing
auto it = memberList.find(ctx.hostname); node.lastSeen = millis();
if (it != memberList.end()) { node.status = NodeInfo::ACTIVE;
NodeInfo& node = it->second; node.uptime = millis();
// Update dynamic resources (always updated)
uint32_t freeHeap = ESP.getFreeHeap(); uint32_t freeHeap = ESP.getFreeHeap();
node.resources.freeHeap = freeHeap; node.resources.freeHeap = freeHeap;
node.resources.chipId = ESP.getChipId();
node.resources.sdkVersion = String(ESP.getSdkVersion());
node.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
node.resources.flashChipSize = ESP.getFlashChipSize();
// Log memory warnings if heap is getting low // Log memory warnings if heap is getting low
if (freeHeap < ctx.config.low_memory_threshold_bytes) { if (freeHeap < ctx.config.low_memory_threshold_bytes) {
@@ -480,4 +491,3 @@ void ClusterManager::updateLocalNodeResources() {
LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free"); LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free");
} }
} }
}

View File

@@ -0,0 +1,114 @@
#include "spore/core/Memberlist.h"
#include <algorithm>
Memberlist::Memberlist() = default;
Memberlist::~Memberlist() = default;
bool Memberlist::addOrUpdateMember(const std::string& ip, const NodeInfo& node) {
auto it = m_members.find(ip);
if (it != m_members.end()) {
// Update existing member
it->second = node;
it->second.lastSeen = millis(); // Update last seen time
return true;
} else {
// Add new member
NodeInfo newNode = node;
newNode.lastSeen = millis();
m_members[ip] = newNode;
return true;
}
}
bool Memberlist::addMember(const std::string& ip, const NodeInfo& node) {
if (m_members.find(ip) != m_members.end()) {
return false; // Member already exists
}
NodeInfo newNode = node;
newNode.lastSeen = millis();
m_members[ip] = newNode;
return true;
}
bool Memberlist::updateMember(const std::string& ip, const NodeInfo& node) {
auto it = m_members.find(ip);
if (it == m_members.end()) {
return false; // Member doesn't exist
}
it->second = node;
it->second.lastSeen = millis(); // Update last seen time
return true;
}
bool Memberlist::removeMember(const std::string& ip) {
auto it = m_members.find(ip);
if (it == m_members.end()) {
return false; // Member doesn't exist
}
m_members.erase(it);
return true;
}
std::optional<NodeInfo> Memberlist::getMember(const std::string& ip) const {
auto it = m_members.find(ip);
if (it != m_members.end()) {
return it->second;
}
return std::nullopt;
}
void Memberlist::forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const {
for (const auto& pair : m_members) {
callback(pair.first, pair.second);
}
}
bool Memberlist::forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const {
for (const auto& pair : m_members) {
if (!callback(pair.first, pair.second)) {
return false;
}
}
return true;
}
size_t Memberlist::getMemberCount() const {
return m_members.size();
}
void Memberlist::updateAllNodeStatuses(unsigned long currentTime,
unsigned long staleThresholdMs,
unsigned long deadThresholdMs,
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange) {
for (auto& [ip, node] : m_members) {
NodeInfo::Status oldStatus = node.status;
updateNodeStatus(node, currentTime, staleThresholdMs, deadThresholdMs);
if (oldStatus != node.status && onStatusChange) {
onStatusChange(ip, oldStatus, node.status);
}
}
}
size_t Memberlist::removeDeadMembers() {
size_t removedCount = 0;
auto it = m_members.begin();
while (it != m_members.end()) {
if (it->second.status == NodeInfo::Status::DEAD) {
it = m_members.erase(it);
++removedCount;
} else {
++it;
}
}
return removedCount;
}
bool Memberlist::hasMember(const std::string& ip) const {
return m_members.find(ip) != m_members.end();
}
void Memberlist::clear() {
m_members.clear();
}

View File

@@ -119,14 +119,9 @@ void NetworkManager::setupWiFi() {
ctx.self.status = NodeInfo::ACTIVE; ctx.self.status = NodeInfo::ACTIVE;
// Ensure member list has an entry for this node // Ensure member list has an entry for this node
auto &memberList = *ctx.memberList; String localIPStr = ctx.localIP.toString();
auto existing = memberList.find(ctx.hostname); ctx.memberList->addOrUpdateMember(localIPStr.c_str(), ctx.self);
if (existing == memberList.end()) {
memberList[ctx.hostname] = ctx.self;
} else {
existing->second = ctx.self;
}
// Notify listeners that the node is (re)discovered // Notify listeners that the node is (re)discovered
ctx.fire("node_discovered", &ctx.self); ctx.fire("node/discovered", &ctx.self);
} }

View File

@@ -2,7 +2,7 @@
NodeContext::NodeContext() { NodeContext::NodeContext() {
udp = new WiFiUDP(); udp = new WiFiUDP();
memberList = new std::map<String, NodeInfo>(); memberList = std::make_unique<Memberlist>();
hostname = ""; hostname = "";
self.hostname = ""; self.hostname = "";
self.ip = IPAddress(); self.ip = IPAddress();
@@ -12,13 +12,14 @@ NodeContext::NodeContext() {
NodeContext::NodeContext(std::initializer_list<std::pair<String, String>> initialLabels) : NodeContext() { NodeContext::NodeContext(std::initializer_list<std::pair<String, String>> initialLabels) : NodeContext() {
for (const auto& kv : initialLabels) { for (const auto& kv : initialLabels) {
constructorLabels[kv.first] = kv.second;
self.labels[kv.first] = kv.second; self.labels[kv.first] = kv.second;
} }
} }
NodeContext::~NodeContext() { NodeContext::~NodeContext() {
delete udp; delete udp;
delete memberList; // memberList is a unique_ptr, so no need to delete manually
} }
void NodeContext::on(const std::string& event, EventCallback cb) { void NodeContext::on(const std::string& event, EventCallback cb) {
@@ -37,3 +38,18 @@ void NodeContext::fire(const std::string& event, void* data) {
void NodeContext::onAny(AnyEventCallback cb) { void NodeContext::onAny(AnyEventCallback cb) {
anyEventSubscribers.push_back(cb); anyEventSubscribers.push_back(cb);
} }
void NodeContext::rebuildLabels() {
// Clear current labels
self.labels.clear();
// Add constructor labels first
for (const auto& kv : constructorLabels) {
self.labels[kv.first] = kv.second;
}
// Add config labels (these override constructor labels if same key)
for (const auto& kv : config.labels) {
self.labels[kv.first] = kv.second;
}
}

View File

@@ -40,8 +40,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
JsonDocument doc; JsonDocument doc;
JsonArray arr = doc["members"].to<JsonArray>(); JsonArray arr = doc["members"].to<JsonArray>();
for (const auto& pair : *ctx.memberList) { ctx.memberList->forEachMember([&arr](const std::string& ip, const NodeInfo& node) {
const NodeInfo& node = pair.second;
JsonObject obj = arr.add<JsonObject>(); JsonObject obj = arr.add<JsonObject>();
obj["hostname"] = node.hostname; obj["hostname"] = node.hostname;
obj["ip"] = node.ip.toString(); obj["ip"] = node.ip.toString();
@@ -56,7 +55,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
labelsObj[kv.first.c_str()] = kv.second; labelsObj[kv.first.c_str()] = kv.second;
} }
} }
} });
String json; String json;
serializeJson(doc, json); serializeJson(doc, json);

View File

@@ -5,8 +5,7 @@
#include <FS.h> #include <FS.h>
#include <LittleFS.h> #include <LittleFS.h>
MonitoringService::MonitoringService(CpuUsage& cpuUsage) MonitoringService::MonitoringService() {
: cpuUsage(cpuUsage) {
} }
void MonitoringService::registerEndpoints(ApiServer& api) { void MonitoringService::registerEndpoints(ApiServer& api) {
@@ -22,20 +21,15 @@ void MonitoringService::registerTasks(TaskManager& taskManager) {
MonitoringService::SystemResources MonitoringService::getSystemResources() const { MonitoringService::SystemResources MonitoringService::getSystemResources() const {
SystemResources resources; SystemResources resources;
// CPU information // CPU information - sending fixed value of 100
resources.currentCpuUsage = cpuUsage.getCpuUsage(); resources.currentCpuUsage = 100.0f;
resources.averageCpuUsage = cpuUsage.getAverageCpuUsage(); resources.averageCpuUsage = 100.0f;
resources.maxCpuUsage = cpuUsage.getMaxCpuUsage(); resources.measurementCount = 0;
resources.minCpuUsage = cpuUsage.getMinCpuUsage(); resources.isMeasuring = false;
resources.measurementCount = cpuUsage.getMeasurementCount();
resources.isMeasuring = cpuUsage.isMeasuring();
// Memory information - ESP8266 compatible // Memory information - ESP8266 compatible
resources.freeHeap = ESP.getFreeHeap(); resources.freeHeap = ESP.getFreeHeap();
resources.totalHeap = 81920; // ESP8266 has ~80KB RAM resources.totalHeap = 81920; // ESP8266 has ~80KB RAM
resources.minFreeHeap = 0; // Not available on ESP8266
resources.maxAllocHeap = 0; // Not available on ESP8266
resources.heapFragmentation = calculateHeapFragmentation();
// Filesystem information // Filesystem information
getFilesystemInfo(resources.totalBytes, resources.usedBytes); getFilesystemInfo(resources.totalBytes, resources.usedBytes);
@@ -59,8 +53,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
JsonObject cpu = doc["cpu"].to<JsonObject>(); JsonObject cpu = doc["cpu"].to<JsonObject>();
cpu["current_usage"] = resources.currentCpuUsage; cpu["current_usage"] = resources.currentCpuUsage;
cpu["average_usage"] = resources.averageCpuUsage; cpu["average_usage"] = resources.averageCpuUsage;
cpu["max_usage"] = resources.maxCpuUsage;
cpu["min_usage"] = resources.minCpuUsage;
cpu["measurement_count"] = resources.measurementCount; cpu["measurement_count"] = resources.measurementCount;
cpu["is_measuring"] = resources.isMeasuring; cpu["is_measuring"] = resources.isMeasuring;
@@ -68,9 +60,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
JsonObject memory = doc["memory"].to<JsonObject>(); JsonObject memory = doc["memory"].to<JsonObject>();
memory["free_heap"] = resources.freeHeap; memory["free_heap"] = resources.freeHeap;
memory["total_heap"] = resources.totalHeap; memory["total_heap"] = resources.totalHeap;
memory["min_free_heap"] = resources.minFreeHeap;
memory["max_alloc_heap"] = resources.maxAllocHeap;
memory["heap_fragmentation"] = resources.heapFragmentation;
memory["heap_usage_percent"] = resources.totalHeap > 0 ? memory["heap_usage_percent"] = resources.totalHeap > 0 ?
(float)(resources.totalHeap - resources.freeHeap) / (float)resources.totalHeap * 100.0f : 0.0f; (float)(resources.totalHeap - resources.freeHeap) / (float)resources.totalHeap * 100.0f : 0.0f;
@@ -94,15 +83,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
request->send(200, "application/json", json); request->send(200, "application/json", json);
} }
size_t MonitoringService::calculateHeapFragmentation() const {
size_t freeHeap = ESP.getFreeHeap();
size_t maxAllocHeap = 0; // Not available on ESP8266
if (maxAllocHeap == 0) return 0;
// Calculate fragmentation as percentage of free heap that can't be allocated in one block
return (freeHeap - maxAllocHeap) * 100 / freeHeap;
}
void MonitoringService::getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const { void MonitoringService::getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const {
totalBytes = 0; totalBytes = 0;

View File

@@ -30,6 +30,18 @@ void NodeService::registerEndpoints(ApiServer& api) {
[this](AsyncWebServerRequest* request) { handleEndpointsRequest(request); }, [this](AsyncWebServerRequest* request) { handleEndpointsRequest(request); },
std::vector<ParamSpec>{}); std::vector<ParamSpec>{});
// Config endpoint for setting labels
api.registerEndpoint("/api/node/config", HTTP_POST,
[this](AsyncWebServerRequest* request) { handleConfigRequest(request); },
std::vector<ParamSpec>{
ParamSpec{String("labels"), true, String("body"), String("json"), {}, String("")}
});
// Config endpoint for getting node configuration (without WiFi password)
api.registerEndpoint("/api/node/config", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleGetConfigRequest(request); },
std::vector<ParamSpec>{});
// Generic local event endpoint // Generic local event endpoint
api.registerEndpoint("/api/node/event", HTTP_POST, api.registerEndpoint("/api/node/event", HTTP_POST,
[this](AsyncWebServerRequest* request) { [this](AsyncWebServerRequest* request) {
@@ -62,11 +74,10 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
doc["flashChipSize"] = ESP.getFlashChipSize(); doc["flashChipSize"] = ESP.getFlashChipSize();
// Include local node labels if present // Include local node labels if present
if (ctx.memberList) { auto member = ctx.memberList->getMember(ctx.hostname.c_str());
auto it = ctx.memberList->find(ctx.hostname); if (member) {
if (it != ctx.memberList->end()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>(); JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : it->second.labels) { for (const auto& kv : member->labels) {
labelsObj[kv.first.c_str()] = kv.second; labelsObj[kv.first.c_str()] = kv.second;
} }
} else if (!ctx.self.labels.empty()) { } else if (!ctx.self.labels.empty()) {
@@ -75,7 +86,6 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
labelsObj[kv.first.c_str()] = kv.second; labelsObj[kv.first.c_str()] = kv.second;
} }
} }
}
String json; String json;
serializeJson(doc, json); serializeJson(doc, json);
@@ -104,7 +114,7 @@ void NodeService::handleUpdateUpload(AsyncWebServerRequest* request, const Strin
LOG_ERROR("OTA", "Update failed: not enough space"); LOG_ERROR("OTA", "Update failed: not enough space");
Update.printError(Serial); Update.printError(Serial);
AsyncWebServerResponse* response = request->beginResponse(500, "application/json", AsyncWebServerResponse* response = request->beginResponse(500, "application/json",
"{\"status\": \"FAIL\"}"); "{\"status\": \"FAIL\", \"message\": \"Update failed: not enough space\"}");
response->addHeader("Connection", "close"); response->addHeader("Connection", "close");
request->send(response); request->send(response);
return; return;
@@ -175,3 +185,108 @@ void NodeService::handleEndpointsRequest(AsyncWebServerRequest* request) {
serializeJson(doc, json); serializeJson(doc, json);
request->send(200, "application/json", json); request->send(200, "application/json", json);
} }
void NodeService::handleConfigRequest(AsyncWebServerRequest* request) {
if (!request->hasParam("labels", true)) {
request->send(400, "application/json", "{\"error\":\"Missing 'labels' parameter\"}");
return;
}
String labelsJson = request->getParam("labels", true)->value();
// Parse the JSON
JsonDocument doc;
DeserializationError error = deserializeJson(doc, labelsJson);
if (error) {
request->send(400, "application/json", "{\"error\":\"Invalid JSON format: " + String(error.c_str()) + "\"}");
return;
}
// Update config labels
ctx.config.labels.clear();
if (doc.is<JsonObject>()) {
JsonObject labelsObj = doc.as<JsonObject>();
for (JsonPair kv : labelsObj) {
ctx.config.labels[kv.key().c_str()] = kv.value().as<String>();
}
}
// Rebuild self.labels from constructor + config labels
ctx.rebuildLabels();
// Update the member list entry for the local node if it exists
String localIPStr = ctx.localIP.toString();
auto member = ctx.memberList->getMember(localIPStr.c_str());
if (member) {
// Update the labels in the member list entry
NodeInfo updatedNode = *member;
updatedNode.labels.clear();
for (const auto& kv : ctx.self.labels) {
updatedNode.labels[kv.first] = kv.second;
}
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
}
// Save config to file
if (ctx.config.saveToFile()) {
LOG_INFO("NodeService", "Labels updated and saved to config");
request->send(200, "application/json", "{\"status\":\"success\",\"message\":\"Labels updated and saved\"}");
} else {
LOG_ERROR("NodeService", "Failed to save labels to config file");
request->send(500, "application/json", "{\"error\":\"Failed to save configuration\"}");
}
}
void NodeService::handleGetConfigRequest(AsyncWebServerRequest* request) {
JsonDocument doc;
// WiFi Configuration (excluding password for security)
JsonObject wifiObj = doc["wifi"].to<JsonObject>();
wifiObj["ssid"] = ctx.config.wifi_ssid;
wifiObj["connect_timeout_ms"] = ctx.config.wifi_connect_timeout_ms;
wifiObj["retry_delay_ms"] = ctx.config.wifi_retry_delay_ms;
// Network Configuration
JsonObject networkObj = doc["network"].to<JsonObject>();
networkObj["udp_port"] = ctx.config.udp_port;
networkObj["api_server_port"] = ctx.config.api_server_port;
// Cluster Configuration
JsonObject clusterObj = doc["cluster"].to<JsonObject>();
clusterObj["heartbeat_interval_ms"] = ctx.config.heartbeat_interval_ms;
clusterObj["cluster_listen_interval_ms"] = ctx.config.cluster_listen_interval_ms;
clusterObj["status_update_interval_ms"] = ctx.config.status_update_interval_ms;
// Node Status Thresholds
JsonObject thresholdsObj = doc["thresholds"].to<JsonObject>();
thresholdsObj["node_active_threshold_ms"] = ctx.config.node_active_threshold_ms;
thresholdsObj["node_inactive_threshold_ms"] = ctx.config.node_inactive_threshold_ms;
thresholdsObj["node_dead_threshold_ms"] = ctx.config.node_dead_threshold_ms;
// System Configuration
JsonObject systemObj = doc["system"].to<JsonObject>();
systemObj["restart_delay_ms"] = ctx.config.restart_delay_ms;
systemObj["json_doc_size"] = ctx.config.json_doc_size;
// Memory Management
JsonObject memoryObj = doc["memory"].to<JsonObject>();
memoryObj["low_memory_threshold_bytes"] = ctx.config.low_memory_threshold_bytes;
memoryObj["critical_memory_threshold_bytes"] = ctx.config.critical_memory_threshold_bytes;
memoryObj["max_concurrent_http_requests"] = ctx.config.max_concurrent_http_requests;
// Custom Labels
if (!ctx.config.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : ctx.config.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
// Add metadata
doc["version"] = "1.0";
doc["retrieved_at"] = millis();
String json;
serializeJson(doc, json);
request->send(200, "application/json", json);
}

View File

@@ -32,12 +32,9 @@ void Config::setDefaults() {
api_server_port = DEFAULT_API_SERVER_PORT; api_server_port = DEFAULT_API_SERVER_PORT;
// Cluster Configuration // Cluster Configuration
discovery_interval_ms = DEFAULT_DISCOVERY_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms
cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS;
heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS; heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS;
status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS; status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS;
member_info_update_interval_ms = DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms
print_interval_ms = DEFAULT_PRINT_INTERVAL_MS;
// Node Status Thresholds // Node Status Thresholds
node_active_threshold_ms = DEFAULT_NODE_ACTIVE_THRESHOLD_MS; node_active_threshold_ms = DEFAULT_NODE_ACTIVE_THRESHOLD_MS;
@@ -56,6 +53,9 @@ void Config::setDefaults() {
low_memory_threshold_bytes = DEFAULT_LOW_MEMORY_THRESHOLD_BYTES; // 10KB low_memory_threshold_bytes = DEFAULT_LOW_MEMORY_THRESHOLD_BYTES; // 10KB
critical_memory_threshold_bytes = DEFAULT_CRITICAL_MEMORY_THRESHOLD_BYTES; // 5KB critical_memory_threshold_bytes = DEFAULT_CRITICAL_MEMORY_THRESHOLD_BYTES; // 5KB
max_concurrent_http_requests = DEFAULT_MAX_CONCURRENT_HTTP_REQUESTS; max_concurrent_http_requests = DEFAULT_MAX_CONCURRENT_HTTP_REQUESTS;
// Custom Labels - start empty by default
labels.clear();
} }
bool Config::saveToFile(const String& filename) { bool Config::saveToFile(const String& filename) {
@@ -83,12 +83,9 @@ bool Config::saveToFile(const String& filename) {
doc["network"]["api_server_port"] = api_server_port; doc["network"]["api_server_port"] = api_server_port;
// Cluster Configuration // Cluster Configuration
doc["cluster"]["discovery_interval_ms"] = discovery_interval_ms;
doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms; doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms;
doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms; doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms;
doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms; doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms;
doc["cluster"]["member_info_update_interval_ms"] = member_info_update_interval_ms;
doc["cluster"]["print_interval_ms"] = print_interval_ms;
// Node Status Thresholds // Node Status Thresholds
doc["thresholds"]["node_active_threshold_ms"] = node_active_threshold_ms; doc["thresholds"]["node_active_threshold_ms"] = node_active_threshold_ms;
@@ -104,6 +101,12 @@ bool Config::saveToFile(const String& filename) {
doc["memory"]["critical_memory_threshold_bytes"] = critical_memory_threshold_bytes; doc["memory"]["critical_memory_threshold_bytes"] = critical_memory_threshold_bytes;
doc["memory"]["max_concurrent_http_requests"] = max_concurrent_http_requests; doc["memory"]["max_concurrent_http_requests"] = max_concurrent_http_requests;
// Custom Labels
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : labels) {
labelsObj[kv.first] = kv.second;
}
// Add metadata // Add metadata
doc["_meta"]["version"] = "1.0"; doc["_meta"]["version"] = "1.0";
doc["_meta"]["saved_at"] = millis(); doc["_meta"]["saved_at"] = millis();
@@ -157,12 +160,9 @@ bool Config::loadFromFile(const String& filename) {
api_server_port = doc["network"]["api_server_port"] | DEFAULT_API_SERVER_PORT; api_server_port = doc["network"]["api_server_port"] | DEFAULT_API_SERVER_PORT;
// Load Cluster Configuration with defaults // Load Cluster Configuration with defaults
discovery_interval_ms = doc["cluster"]["discovery_interval_ms"] | DEFAULT_DISCOVERY_INTERVAL_MS;
heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS; heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS;
cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS;
status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS; status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS;
member_info_update_interval_ms = doc["cluster"]["member_info_update_interval_ms"] | DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS;
print_interval_ms = doc["cluster"]["print_interval_ms"] | DEFAULT_PRINT_INTERVAL_MS;
// Load Node Status Thresholds with defaults // Load Node Status Thresholds with defaults
node_active_threshold_ms = doc["thresholds"]["node_active_threshold_ms"] | DEFAULT_NODE_ACTIVE_THRESHOLD_MS; node_active_threshold_ms = doc["thresholds"]["node_active_threshold_ms"] | DEFAULT_NODE_ACTIVE_THRESHOLD_MS;
@@ -178,6 +178,15 @@ bool Config::loadFromFile(const String& filename) {
critical_memory_threshold_bytes = doc["memory"]["critical_memory_threshold_bytes"] | DEFAULT_CRITICAL_MEMORY_THRESHOLD_BYTES; critical_memory_threshold_bytes = doc["memory"]["critical_memory_threshold_bytes"] | DEFAULT_CRITICAL_MEMORY_THRESHOLD_BYTES;
max_concurrent_http_requests = doc["memory"]["max_concurrent_http_requests"] | DEFAULT_MAX_CONCURRENT_HTTP_REQUESTS; max_concurrent_http_requests = doc["memory"]["max_concurrent_http_requests"] | DEFAULT_MAX_CONCURRENT_HTTP_REQUESTS;
// Load Custom Labels
labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kv : labelsObj) {
labels[kv.key().c_str()] = kv.value().as<String>();
}
}
LOG_DEBUG("Config", "Loaded WiFi SSID: " + wifi_ssid); LOG_DEBUG("Config", "Loaded WiFi SSID: " + wifi_ssid);
LOG_DEBUG("Config", "Config file version: " + String(doc["_meta"]["version"] | "unknown")); LOG_DEBUG("Config", "Config file version: " + String(doc["_meta"]["version"] | "unknown"));

View File

@@ -1,185 +0,0 @@
#include "spore/util/CpuUsage.h"
CpuUsage::CpuUsage()
: _initialized(false)
, _measuring(false)
, _measurementCount(0)
, _cycleStartTime(0)
, _idleStartTime(0)
, _totalIdleTime(0)
, _totalCycleTime(0)
, _currentCpuUsage(0.0f)
, _averageCpuUsage(0.0f)
, _maxCpuUsage(0.0f)
, _minCpuUsage(100.0f)
, _totalCpuTime(0)
, _rollingIndex(0)
, _rollingWindowFull(false) {
// Initialize rolling window
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
_rollingWindow[i] = 0.0f;
}
}
void CpuUsage::begin() {
if (_initialized) {
return;
}
_initialized = true;
_measurementCount = 0;
_totalIdleTime = 0;
_totalCycleTime = 0;
_totalCpuTime = 0;
_currentCpuUsage = 0.0f;
_averageCpuUsage = 0.0f;
_maxCpuUsage = 0.0f;
_minCpuUsage = 100.0f;
_rollingIndex = 0;
_rollingWindowFull = false;
// Initialize rolling window
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
_rollingWindow[i] = 0.0f;
}
}
void CpuUsage::startMeasurement() {
if (!_initialized) {
return;
}
if (_measuring) {
// If already measuring, end the previous measurement first
endMeasurement();
}
_measuring = true;
_cycleStartTime = millis();
_idleStartTime = millis();
}
void CpuUsage::endMeasurement() {
if (!_initialized || !_measuring) {
return;
}
unsigned long cycleEndTime = millis();
unsigned long cycleDuration = cycleEndTime - _cycleStartTime;
// Calculate idle time (time spent in yield() calls)
unsigned long idleTime = cycleEndTime - _idleStartTime;
// Calculate CPU usage
if (cycleDuration > 0) {
_currentCpuUsage = ((float)(cycleDuration - idleTime) / (float)cycleDuration) * 100.0f;
// Clamp to valid range
if (_currentCpuUsage < 0.0f) {
_currentCpuUsage = 0.0f;
} else if (_currentCpuUsage > 100.0f) {
_currentCpuUsage = 100.0f;
}
// Update statistics
_totalCycleTime += cycleDuration;
_totalIdleTime += idleTime;
_totalCpuTime += (cycleDuration - idleTime);
_measurementCount++;
// Update rolling average
updateRollingAverage(_currentCpuUsage);
// Update min/max
updateMinMax(_currentCpuUsage);
// Calculate overall average
if (_measurementCount > 0) {
_averageCpuUsage = ((float)_totalCpuTime / (float)_totalCycleTime) * 100.0f;
}
}
_measuring = false;
}
float CpuUsage::getCpuUsage() const {
return _currentCpuUsage;
}
float CpuUsage::getAverageCpuUsage() const {
if (_rollingWindowFull) {
return _averageCpuUsage;
} else if (_measurementCount > 0) {
// Calculate average from rolling window
float sum = 0.0f;
for (size_t i = 0; i < _rollingIndex; ++i) {
sum += _rollingWindow[i];
}
return sum / (float)_rollingIndex;
}
return 0.0f;
}
float CpuUsage::getMaxCpuUsage() const {
return _maxCpuUsage;
}
float CpuUsage::getMinCpuUsage() const {
return _minCpuUsage;
}
void CpuUsage::reset() {
_measurementCount = 0;
_totalIdleTime = 0;
_totalCycleTime = 0;
_totalCpuTime = 0;
_currentCpuUsage = 0.0f;
_averageCpuUsage = 0.0f;
_maxCpuUsage = 0.0f;
_minCpuUsage = 100.0f;
_rollingIndex = 0;
_rollingWindowFull = false;
// Reset rolling window
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
_rollingWindow[i] = 0.0f;
}
}
bool CpuUsage::isMeasuring() const {
return _measuring;
}
unsigned long CpuUsage::getMeasurementCount() const {
return _measurementCount;
}
void CpuUsage::updateRollingAverage(float value) {
_rollingWindow[_rollingIndex] = value;
_rollingIndex++;
if (_rollingIndex >= ROLLING_WINDOW_SIZE) {
_rollingIndex = 0;
_rollingWindowFull = true;
}
// Calculate rolling average
float sum = 0.0f;
size_t count = _rollingWindowFull ? ROLLING_WINDOW_SIZE : _rollingIndex;
for (size_t i = 0; i < count; ++i) {
sum += _rollingWindow[i];
}
_averageCpuUsage = sum / (float)count;
}
void CpuUsage::updateMinMax(float value) {
if (value > _maxCpuUsage) {
_maxCpuUsage = value;
}
if (value < _minCpuUsage) {
_minCpuUsage = value;
}
}