Compare commits

17 Commits

Author SHA1 Message Date
e60093c419 Merge pull request 'feature/base-firmware-size-optimization' (#18) from feature/base-firmware-size-optimization into main
Reviewed-on: #18
2025-11-04 12:18:31 +01:00
633957c95c feat: configurable PixelStream 2025-10-28 21:05:02 +01:00
ad879bfe7b feat: remove CpuUsage calculation 2025-10-27 10:38:56 +01:00
4559e13d7d fix: revert RAW message 2025-10-27 07:48:06 +01:00
682849650d Merge pull request 'feat: new cluster protocol, event naming' (#17) from feature/cluster-protocol-update into main
Reviewed-on: #17
2025-10-26 12:51:41 +01:00
0f003335b3 feat: new cluster protocol, event naming 2025-10-26 12:43:22 +01:00
eab10cffa5 Merge pull request 'refactoring/firmware-optimizations' (#16) from refactoring/firmware-optimizations into main
Reviewed-on: #16
2025-10-21 13:51:03 +02:00
7f40626187 feat: improve local node initalization 2025-10-21 11:19:12 +02:00
e796375a9f feat: memberlist optimization 2025-10-21 10:50:46 +02:00
daae29dd3f refactor: update local node 2025-10-20 21:35:08 +02:00
37a68e26d8 refactor: remove unused and obsolet stuff 2025-10-20 21:21:02 +02:00
7bd3e87271 Merge pull request 'feature/improved-cluster-forming' (#15) from feature/improved-cluster-forming into main
Reviewed-on: #15
2025-10-19 17:47:05 +02:00
0d09c5900c docs: update 2025-10-19 16:55:48 +02:00
407b651b82 feat: change event naming schema 2025-10-19 13:48:13 +02:00
23289d9f09 fix: latency calculation 2025-10-19 13:11:36 +02:00
b6ad479352 feat: calculate latency during heartbeat 2025-10-19 12:59:26 +02:00
3ed44cd00f feat: improve cluster forming; just use heartbeat to form the cluster 2025-10-19 12:50:43 +02:00
28 changed files with 914 additions and 772 deletions

3
ctl.sh
View File

@@ -372,12 +372,9 @@ function node {
# Cluster Configuration # Cluster Configuration
echo "=== Cluster Configuration ===" echo "=== Cluster Configuration ==="
echo "Discovery Interval: $(echo "$response_body" | jq -r '.cluster.discovery_interval_ms // "N/A"') ms"
echo "Heartbeat Interval: $(echo "$response_body" | jq -r '.cluster.heartbeat_interval_ms // "N/A"') ms" echo "Heartbeat Interval: $(echo "$response_body" | jq -r '.cluster.heartbeat_interval_ms // "N/A"') ms"
echo "Cluster Listen Interval: $(echo "$response_body" | jq -r '.cluster.cluster_listen_interval_ms // "N/A"') ms" echo "Cluster Listen Interval: $(echo "$response_body" | jq -r '.cluster.cluster_listen_interval_ms // "N/A"') ms"
echo "Status Update Interval: $(echo "$response_body" | jq -r '.cluster.status_update_interval_ms // "N/A"') ms" echo "Status Update Interval: $(echo "$response_body" | jq -r '.cluster.status_update_interval_ms // "N/A"') ms"
echo "Member Info Update Interval: $(echo "$response_body" | jq -r '.cluster.member_info_update_interval_ms // "N/A"') ms"
echo "Print Interval: $(echo "$response_body" | jq -r '.cluster.print_interval_ms // "N/A"') ms"
echo "" echo ""
# Node Status Thresholds # Node Status Thresholds

View File

@@ -42,65 +42,79 @@ The cluster uses a UDP-based discovery protocol for automatic node detection:
### Discovery Process ### Discovery Process
1. **Discovery Broadcast**: Nodes periodically send UDP packets on port `udp_port` (default 4210) 1. **Discovery Broadcast**: Nodes periodically send heartbeat messages on port `udp_port` (default 4210)
2. **Response Handling**: Nodes respond with `CLUSTER_RESPONSE:<hostname>` 2. **Response Handling**: Nodes respond with node update information containing their current state
3. **Member Management**: Discovered nodes are added/updated in the cluster 3. **Member Management**: Discovered nodes are added/updated in the cluster with current information
4. **Node Info via UDP**: Heartbeat triggers peers to send `CLUSTER_NODE_INFO:<hostname>:<json>` 4. **Node Synchronization**: Periodic broadcasts ensure all nodes maintain current cluster state
### Protocol Details ### Protocol Details
- **UDP Port**: 4210 (configurable via `Config.udp_port`) - **UDP Port**: 4210 (configurable via `Config.udp_port`)
- **Discovery Message**: `CLUSTER_DISCOVERY` - **Heartbeat Message**: `CLUSTER_HEARTBEAT:hostname`
- **Response Message**: `CLUSTER_RESPONSE` - **Node Update Message**: `NODE_UPDATE:hostname:{json}`
- **Heartbeat Message**: `CLUSTER_HEARTBEAT`
- **Node Info Message**: `CLUSTER_NODE_INFO:<hostname>:<json>`
- **Broadcast Address**: 255.255.255.255 - **Broadcast Address**: 255.255.255.255
- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms)
- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms)
- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms)
### Message Formats ### Message Formats
- **Discovery**: `CLUSTER_DISCOVERY` - **Heartbeat**: `CLUSTER_HEARTBEAT:hostname`
- Sender: any node, broadcast to 255.255.255.255:`udp_port`
- Purpose: announce presence and solicit peer identification
- **Response**: `CLUSTER_RESPONSE:<hostname>`
- Sender: node receiving a discovery; unicast to requester IP
- Purpose: provide hostname so requester can register/update member
- **Heartbeat**: `CLUSTER_HEARTBEAT:<hostname>`
- Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval - Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval
- Purpose: prompt peers to reply with their node info and keep liveness - Purpose: announce presence, prompt peers for node info, and keep liveness
- **Node Info**: `CLUSTER_NODE_INFO:<hostname>:<json>` - **Node Update**: `NODE_UPDATE:hostname:{json}`
- Sender: node receiving a heartbeat; unicast to heartbeat sender IP - Sender: node responding to heartbeat or broadcasting current state
- JSON fields: freeHeap, chipId, sdkVersion, cpuFreqMHz, flashChipSize, optional labels - JSON fields: hostname, ip, uptime, optional labels
- Purpose: provide current node information for cluster synchronization
### Discovery Flow ### Discovery Flow
1. **Sender broadcasts** `CLUSTER_DISCOVERY` 1. **A node broadcasts** `CLUSTER_HEARTBEAT:hostname` to announce its presence
2. **Each receiver responds** with `CLUSTER_RESPONSE:<hostname>` to the sender IP 2. **Each receiver responds** with `NODE_UPDATE:hostname:{json}` containing current node state
3. **Sender registers/updates** the node using hostname and source IP
### Heartbeat Flow
1. **A node broadcasts** `CLUSTER_HEARTBEAT:<hostname>`
2. **Each receiver replies** with `CLUSTER_NODE_INFO:<hostname>:<json>` to the heartbeat sender IP
3. **The sender**: 3. **The sender**:
- Ensures the node exists or creates it with `hostname` and sender IP - Ensures the responding node exists or creates it with current IP and information
- Parses JSON and updates resources, labels, `status = ACTIVE`, `lastSeen = now` - Parses JSON and updates node info, `status = ACTIVE`, `lastSeen = now`
- Sets `latency = now - lastHeartbeatSentAt` (per-node, measured at heartbeat origin) - Calculates `latency = now - lastHeartbeatSentAt` for network performance monitoring
### Node Synchronization
1. **Event-driven broadcasts**: Nodes broadcast `NODE_UPDATE:hostname:{json}` when node information changes
2. **All receivers**: Update their memberlist entry for the broadcasting node
3. **Purpose**: Ensures all nodes maintain current cluster state and configuration
### Sequence Diagram
```mermaid
sequenceDiagram
participant N1 as Node A (esp-node1)
participant N2 as Node B (esp-node2)
Note over N1,N2: Discovery via heartbeat broadcast
N1->>+N2: CLUSTER_HEARTBEAT:esp-node1
Note over N2: Node B responds with its current state
N2->>+N1: NODE_UPDATE:esp-node1:{"hostname":"esp-node2","uptime":12345,"labels":{"role":"sensor"}}
Note over N1: Process NODE_UPDATE response
N1-->>N1: Update memberlist for Node B
N1-->>N1: Set Node B status = ACTIVE
N1-->>N1: Calculate latency for Node B
Note over N1,N2: Event-driven node synchronization
N1->>+N2: NODE_UPDATE:esp-node1:{"hostname":"esp-node1","uptime":12346,"labels":{"role":"controller"}}
Note over N2: Update memberlist with latest information
N2-->>N2: Update Node A info, maintain ACTIVE status
```
### Listener Behavior ### Listener Behavior
The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to: The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to:
- **Discovery** → send `CLUSTER_RESPONSE` - **Heartbeat** → add/update responding node and send `NODE_UPDATE` response
- **Heartbeat** → send `CLUSTER_NODE_INFO` JSON - **Node Update** → update node information and trigger memberlist logging
- **Response** → add/update node using provided hostname and source IP
- **Node Info** → update resources/status/labels and record latency
### Timing and Intervals ### Timing and Intervals
- **UDP Port**: `Config.udp_port` (default 4210) - **UDP Port**: `Config.udp_port` (default 4210)
- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms)
- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms) - **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms)
- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms) - **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms)
@@ -120,12 +134,9 @@ The system runs several background tasks at different intervals:
| Task | Interval (default) | Purpose | | Task | Interval (default) | Purpose |
|------|--------------------|---------| |------|--------------------|---------|
| `cluster_discovery` | 1000 ms | Send UDP discovery packets | | `cluster_listen` | 10 ms | Listen for heartbeat/node-info messages |
| `cluster_listen` | 10 ms | Listen for discovery/heartbeat/node-info |
| `status_update` | 1000 ms | Update node status categories, purge dead | | `status_update` | 1000 ms | Update node status categories, purge dead |
| `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources | | `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources |
| `cluster_update_members_info` | 10000 ms | Reserved; no-op (info via UDP) |
| `print_members` | 5000 ms | Log current member list |
### Task Management Features ### Task Management Features
@@ -142,12 +153,12 @@ The `NodeContext` provides an event-driven architecture for system-wide communic
```cpp ```cpp
// Subscribe to events // Subscribe to events
ctx.on("node_discovered", [](void* data) { ctx.on("node/discovered", [](void* data) {
NodeInfo* node = static_cast<NodeInfo*>(data); NodeInfo* node = static_cast<NodeInfo*>(data);
// Handle new node discovery // Handle new node discovery
}); });
ctx.on("cluster_updated", [](void* data) { ctx.on("cluster/updated", [](void* data) {
// Handle cluster membership changes // Handle cluster membership changes
}); });
``` ```
@@ -156,13 +167,13 @@ ctx.on("cluster_updated", [](void* data) {
```cpp ```cpp
// Publish events // Publish events
ctx.fire("node_discovered", &newNode); ctx.fire("node/discovered", &newNode);
ctx.fire("cluster_updated", &clusterData); ctx.fire("cluster/updated", &clusterData);
``` ```
### Available Events ### Available Events
- **`node_discovered`**: New node added or local node refreshed - **`node/discovered`**: New node added or local node refreshed
## Resource Monitoring ## Resource Monitoring

View File

@@ -93,12 +93,9 @@ The configuration is stored as a JSON file with the following structure:
"api_server_port": 80 "api_server_port": 80
}, },
"cluster": { "cluster": {
"discovery_interval_ms": 1000,
"heartbeat_interval_ms": 5000, "heartbeat_interval_ms": 5000,
"cluster_listen_interval_ms": 10, "cluster_listen_interval_ms": 10,
"status_update_interval_ms": 1000, "status_update_interval_ms": 1000
"member_info_update_interval_ms": 10000,
"print_interval_ms": 5000
}, },
"thresholds": { "thresholds": {
"node_active_threshold_ms": 10000, "node_active_threshold_ms": 10000,

View File

@@ -195,13 +195,13 @@ void NeoPatternService::registerEventHandlers() {
JsonDocument doc; JsonDocument doc;
DeserializationError err = deserializeJson(doc, *jsonStr); DeserializationError err = deserializeJson(doc, *jsonStr);
if (err) { if (err) {
LOG_WARN("NeoPattern", String("Failed to parse CLUSTER_EVENT data: ") + err.c_str()); LOG_WARN("NeoPattern", String("Failed to parse cluster/event data: ") + err.c_str());
return; return;
} }
JsonObject obj = doc.as<JsonObject>(); JsonObject obj = doc.as<JsonObject>();
bool applied = applyControlParams(obj); bool applied = applyControlParams(obj);
if (applied) { if (applied) {
LOG_INFO("NeoPattern", "Applied control from CLUSTER_EVENT"); LOG_INFO("NeoPattern", "Applied control from cluster/event");
} }
}); });

View File

@@ -0,0 +1,202 @@
#include "PixelStreamService.h"
#include "spore/util/Logging.h"
#include <Adafruit_NeoPixel.h>
PixelStreamService::PixelStreamService(NodeContext& ctx, ApiServer& apiServer, PixelStreamController* controller)
: ctx(ctx), apiServer(apiServer), controller(controller) {}
void PixelStreamService::registerEndpoints(ApiServer& api) {
// Config endpoint for setting pixelstream configuration
api.registerEndpoint("/api/pixelstream/config", HTTP_PUT,
[this](AsyncWebServerRequest* request) { handleConfigRequest(request); },
std::vector<ParamSpec>{
ParamSpec{String("pin"), false, String("body"), String("number"), {}, String("")},
ParamSpec{String("pixel_count"), false, String("body"), String("number"), {}, String("")},
ParamSpec{String("brightness"), false, String("body"), String("number"), {}, String("")},
ParamSpec{String("matrix_width"), false, String("body"), String("number"), {}, String("")},
ParamSpec{String("matrix_serpentine"), false, String("body"), String("boolean"), {}, String("")},
ParamSpec{String("pixel_type"), false, String("body"), String("number"), {}, String("")}
});
// Config endpoint for getting pixelstream configuration
api.registerEndpoint("/api/pixelstream/config", HTTP_GET,
[this](AsyncWebServerRequest* request) { handleGetConfigRequest(request); },
std::vector<ParamSpec>{});
}
void PixelStreamService::registerTasks(TaskManager& taskManager) {
// PixelStreamService doesn't register any tasks itself
}
PixelStreamConfig PixelStreamService::loadConfig() {
// Initialize with proper defaults
PixelStreamConfig config;
config.pin = 2;
config.pixelCount = 16;
config.brightness = 80;
config.matrixWidth = 16;
config.matrixSerpentine = false;
config.pixelType = NEO_GRB + NEO_KHZ800;
if (!LittleFS.begin()) {
LOG_WARN("PixelStream", "Failed to initialize LittleFS, using defaults");
return config;
}
if (!LittleFS.exists(CONFIG_FILE())) {
LOG_INFO("PixelStream", "No pixelstream config file found, using defaults");
// Save defaults
saveConfig(config);
return config;
}
File file = LittleFS.open(CONFIG_FILE(), "r");
if (!file) {
LOG_ERROR("PixelStream", "Failed to open config file for reading");
return config;
}
JsonDocument doc;
DeserializationError error = deserializeJson(doc, file);
file.close();
if (error) {
LOG_ERROR("PixelStream", "Failed to parse config file: " + String(error.c_str()));
return config;
}
if (doc["pin"].is<uint8_t>()) config.pin = doc["pin"].as<uint8_t>();
if (doc["pixel_count"].is<uint16_t>()) config.pixelCount = doc["pixel_count"].as<uint16_t>();
if (doc["brightness"].is<uint8_t>()) config.brightness = doc["brightness"].as<uint8_t>();
if (doc["matrix_width"].is<uint16_t>()) config.matrixWidth = doc["matrix_width"].as<uint16_t>();
if (doc["matrix_serpentine"].is<bool>()) config.matrixSerpentine = doc["matrix_serpentine"].as<bool>();
if (doc["pixel_type"].is<uint8_t>()) config.pixelType = static_cast<neoPixelType>(doc["pixel_type"].as<uint8_t>());
LOG_INFO("PixelStream", "Configuration loaded from " + String(CONFIG_FILE()));
return config;
}
bool PixelStreamService::saveConfig(const PixelStreamConfig& config) {
if (!LittleFS.begin()) {
LOG_ERROR("PixelStream", "LittleFS not initialized, cannot save config");
return false;
}
File file = LittleFS.open(CONFIG_FILE(), "w");
if (!file) {
LOG_ERROR("PixelStream", "Failed to open config file for writing");
return false;
}
JsonDocument doc;
doc["pin"] = config.pin;
doc["pixel_count"] = config.pixelCount;
doc["brightness"] = config.brightness;
doc["matrix_width"] = config.matrixWidth;
doc["matrix_serpentine"] = config.matrixSerpentine;
doc["pixel_type"] = static_cast<uint8_t>(config.pixelType);
size_t bytesWritten = serializeJson(doc, file);
file.close();
if (bytesWritten > 0) {
LOG_INFO("PixelStream", "Configuration saved to " + String(CONFIG_FILE()) + " (" + String(bytesWritten) + " bytes)");
return true;
} else {
LOG_ERROR("PixelStream", "Failed to write configuration to file");
return false;
}
}
void PixelStreamService::handleConfigRequest(AsyncWebServerRequest* request) {
// Load current config from file
PixelStreamConfig config = loadConfig();
bool updated = false;
// Handle individual form parameters
if (request->hasParam("pin", true)) {
String pinStr = request->getParam("pin", true)->value();
if (pinStr.length() > 0) {
int pinValue = pinStr.toInt();
if (pinValue >= 0 && pinValue <= 255) {
config.pin = static_cast<uint8_t>(pinValue);
updated = true;
}
}
}
if (request->hasParam("pixel_count", true)) {
String countStr = request->getParam("pixel_count", true)->value();
if (countStr.length() > 0) {
int countValue = countStr.toInt();
if (countValue > 0 && countValue <= 65535) {
config.pixelCount = static_cast<uint16_t>(countValue);
updated = true;
}
}
}
if (request->hasParam("brightness", true)) {
String brightnessStr = request->getParam("brightness", true)->value();
if (brightnessStr.length() > 0) {
int brightnessValue = brightnessStr.toInt();
if (brightnessValue >= 0 && brightnessValue <= 255) {
config.brightness = static_cast<uint8_t>(brightnessValue);
updated = true;
}
}
}
if (request->hasParam("matrix_width", true)) {
String widthStr = request->getParam("matrix_width", true)->value();
if (widthStr.length() > 0) {
int widthValue = widthStr.toInt();
if (widthValue > 0 && widthValue <= 65535) {
config.matrixWidth = static_cast<uint16_t>(widthValue);
updated = true;
}
}
}
if (request->hasParam("matrix_serpentine", true)) {
String serpentineStr = request->getParam("matrix_serpentine", true)->value();
config.matrixSerpentine = (serpentineStr.equalsIgnoreCase("true") || serpentineStr == "1");
updated = true;
}
if (request->hasParam("pixel_type", true)) {
String typeStr = request->getParam("pixel_type", true)->value();
if (typeStr.length() > 0) {
int typeValue = typeStr.toInt();
config.pixelType = static_cast<neoPixelType>(typeValue);
updated = true;
}
}
if (!updated) {
request->send(400, "application/json", "{\"error\":\"No valid configuration fields provided\"}");
return;
}
// Save config to file
if (saveConfig(config)) {
LOG_INFO("PixelStreamService", "Configuration updated and saved to pixelstream.json");
request->send(200, "application/json", "{\"status\":\"success\",\"message\":\"Configuration updated and saved\"}");
} else {
LOG_ERROR("PixelStreamService", "Failed to save configuration to file");
request->send(500, "application/json", "{\"error\":\"Failed to save configuration\"}");
}
}
void PixelStreamService::handleGetConfigRequest(AsyncWebServerRequest* request) {
PixelStreamConfig config = loadConfig();
JsonDocument doc;
doc["pin"] = config.pin;
doc["pixel_count"] = config.pixelCount;
doc["brightness"] = config.brightness;
doc["matrix_width"] = config.matrixWidth;
doc["matrix_serpentine"] = config.matrixSerpentine;
doc["pixel_type"] = static_cast<uint8_t>(config.pixelType);
String json;
serializeJson(doc, json);
request->send(200, "application/json", json);
}

View File

@@ -0,0 +1,33 @@
#pragma once
#include "spore/Service.h"
#include "spore/core/NodeContext.h"
#include "PixelStreamController.h"
#include <ArduinoJson.h>
#include <LittleFS.h>
#include "spore/util/Logging.h"
// PixelStreamConfig is defined in PixelStreamController.h
class PixelStreamService : public Service {
public:
PixelStreamService(NodeContext& ctx, ApiServer& apiServer, PixelStreamController* controller);
void registerEndpoints(ApiServer& api) override;
void registerTasks(TaskManager& taskManager) override;
const char* getName() const override { return "PixelStream"; }
// Config management
PixelStreamConfig loadConfig();
bool saveConfig(const PixelStreamConfig& config);
void setController(PixelStreamController* ctrl) { controller = ctrl; }
private:
NodeContext& ctx;
ApiServer& apiServer;
PixelStreamController* controller;
void handleConfigRequest(AsyncWebServerRequest* request);
void handleGetConfigRequest(AsyncWebServerRequest* request);
static const char* CONFIG_FILE() { return "/pixelstream.json"; }
};

View File

@@ -2,7 +2,11 @@
#include "spore/Spore.h" #include "spore/Spore.h"
#include "spore/util/Logging.h" #include "spore/util/Logging.h"
#include "PixelStreamController.h" #include "PixelStreamController.h"
#include "PixelStreamService.h"
#include <Adafruit_NeoPixel.h>
// Defaults are now loaded from config.json on LittleFS
// Can still be overridden with preprocessor defines if needed
#ifndef PIXEL_PIN #ifndef PIXEL_PIN
#define PIXEL_PIN 2 #define PIXEL_PIN 2
#endif #endif
@@ -34,22 +38,28 @@ Spore spore({
}); });
PixelStreamController* controller = nullptr; PixelStreamController* controller = nullptr;
PixelStreamService* service = nullptr;
void setup() { void setup() {
spore.setup(); spore.setup();
PixelStreamConfig config{ // Create service first (need it to load config)
static_cast<uint8_t>(PIXEL_PIN), service = new PixelStreamService(spore.getContext(), spore.getApiServer(), nullptr);
static_cast<uint16_t>(PIXEL_COUNT),
static_cast<uint8_t>(PIXEL_BRIGHTNESS),
static_cast<uint16_t>(PIXEL_MATRIX_WIDTH),
static_cast<bool>(PIXEL_MATRIX_SERPENTINE),
static_cast<neoPixelType>(PIXEL_TYPE)
};
// Load pixelstream config from LittleFS (pixelstream.json) or use defaults
PixelStreamConfig config = service->loadConfig();
// Create controller with loaded config
controller = new PixelStreamController(spore.getContext(), config); controller = new PixelStreamController(spore.getContext(), config);
controller->begin(); controller->begin();
// Update service with the actual controller
service->setController(controller);
// Register service
spore.registerService(service);
// Start the API server
spore.begin(); spore.begin();
} }

View File

@@ -11,7 +11,6 @@
#include "core/TaskManager.h" #include "core/TaskManager.h"
#include "Service.h" #include "Service.h"
#include "util/Logging.h" #include "util/Logging.h"
#include "util/CpuUsage.h"
class Spore { class Spore {
public: public:
@@ -35,12 +34,6 @@ public:
ClusterManager& getCluster() { return cluster; } ClusterManager& getCluster() { return cluster; }
ApiServer& getApiServer() { return apiServer; } ApiServer& getApiServer() { return apiServer; }
// CPU usage monitoring
CpuUsage& getCpuUsage() { return cpuUsage; }
float getCurrentCpuUsage() const { return cpuUsage.getCpuUsage(); }
float getAverageCpuUsage() const { return cpuUsage.getAverageCpuUsage(); }
private: private:
void initializeCore(); void initializeCore();
void registerCoreServices(); void registerCoreServices();
@@ -51,7 +44,6 @@ private:
TaskManager taskManager; TaskManager taskManager;
ClusterManager cluster; ClusterManager cluster;
ApiServer apiServer; ApiServer apiServer;
CpuUsage cpuUsage;
std::vector<std::shared_ptr<Service>> services; std::vector<std::shared_ptr<Service>> services;
bool initialized; bool initialized;

View File

@@ -14,17 +14,16 @@ class ClusterManager {
public: public:
ClusterManager(NodeContext& ctx, TaskManager& taskMgr); ClusterManager(NodeContext& ctx, TaskManager& taskMgr);
void registerTasks(); void registerTasks();
void sendDiscovery();
void listen(); void listen();
void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP); void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP);
void updateAllNodeStatuses(); void updateAllNodeStatuses();
void removeDeadNodes(); void removeDeadNodes();
void printMemberList(); void printMemberList();
const std::map<String, NodeInfo>& getMemberList() const { return *ctx.memberList; } size_t getMemberCount() const { return ctx.memberList->getMemberCount(); }
void fetchNodeInfo(const IPAddress& ip); void updateLocalNodeResources(NodeInfo& node);
void updateLocalNodeResources();
void heartbeatTaskCallback(); void heartbeatTaskCallback();
void updateAllMembersInfoTaskCallback(); void updateAllMembersInfoTaskCallback();
void broadcastNodeUpdate();
private: private:
NodeContext& ctx; NodeContext& ctx;
TaskManager& taskManager; TaskManager& taskManager;
@@ -35,18 +34,15 @@ private:
}; };
void initMessageHandlers(); void initMessageHandlers();
void handleIncomingMessage(const char* incoming); void handleIncomingMessage(const char* incoming);
static bool isDiscoveryMsg(const char* msg);
static bool isHeartbeatMsg(const char* msg); static bool isHeartbeatMsg(const char* msg);
static bool isResponseMsg(const char* msg); static bool isNodeUpdateMsg(const char* msg);
static bool isNodeInfoMsg(const char* msg);
static bool isClusterEventMsg(const char* msg); static bool isClusterEventMsg(const char* msg);
static bool isRawMsg(const char* msg); static bool isRawMsg(const char* msg);
void onDiscovery(const char* msg);
void onHeartbeat(const char* msg); void onHeartbeat(const char* msg);
void onResponse(const char* msg); void onNodeUpdate(const char* msg);
void onNodeInfo(const char* msg);
void onClusterEvent(const char* msg); void onClusterEvent(const char* msg);
void onRawMessage(const char* msg); void onRawMessage(const char* msg);
void sendNodeInfo(const String& hostname, const IPAddress& targetIP);
unsigned long lastHeartbeatSentAt = 0; unsigned long lastHeartbeatSentAt = 0;
std::vector<MessageHandler> messageHandlers; std::vector<MessageHandler> messageHandlers;
}; };

View File

@@ -0,0 +1,133 @@
#pragma once
#include <Arduino.h>
#include <map>
#include <string>
#include <optional>
#include <functional>
#include "spore/types/NodeInfo.h"
/**
* @brief Manages the list of cluster members.
*
* The Memberlist class maintains a collection of cluster members, where each member
* is identified by its IP address and associated with a NodeInfo object. It provides
* methods to add, update, and remove members, as well as handle node status changes
* (stale and dead nodes).
*/
class Memberlist {
public:
/**
* @brief Default constructor.
*/
Memberlist();
/**
* @brief Destructor.
*/
~Memberlist();
/**
* @brief Adds or updates a member in the list.
*
* If the member already exists, updates its information. Otherwise, adds a new member.
* @param ip The IP address of the member (as string).
* @param node The NodeInfo object containing member details.
* @return True if the member was added or updated, false otherwise.
*/
bool addOrUpdateMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Adds a new member to the list.
*
* @param ip The IP address of the member (as string).
* @param node The NodeInfo object containing member details.
* @return True if the member was added, false if it already exists.
*/
bool addMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Updates an existing member in the list.
*
* @param ip The IP address of the member (as string).
* @param node The updated NodeInfo object.
* @return True if the member was updated, false if it doesn't exist.
*/
bool updateMember(const std::string& ip, const NodeInfo& node);
/**
* @brief Removes a member from the list.
*
* @param ip The IP address of the member to remove (as string).
* @return True if the member was removed, false if it doesn't exist.
*/
bool removeMember(const std::string& ip);
/**
* @brief Retrieves a member by IP address.
*
* @param ip The IP address of the member (as string).
* @return Optional containing the NodeInfo if found, or std::nullopt if not found.
*/
std::optional<NodeInfo> getMember(const std::string& ip) const;
/**
* @brief Iterates over all members and calls the provided callback for each.
*
* @param callback Function to call for each member. Receives (ip, node) as parameters.
*/
void forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const;
/**
* @brief Iterates over all members and calls the provided callback for each.
*
* @param callback Function to call for each member. Receives (ip, node) as parameters.
* If callback returns false, iteration stops.
* @return True if all members were processed, false if iteration was stopped early.
*/
bool forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const;
/**
* @brief Gets the number of members in the list.
*
* @return The number of members.
*/
size_t getMemberCount() const;
/**
* @brief Updates the status of all members based on current time and thresholds.
*
* Marks nodes as stale or dead based on their last seen time.
* @param currentTime The current time in milliseconds.
* @param staleThresholdMs Threshold for marking a node as stale (milliseconds).
* @param deadThresholdMs Threshold for marking a node as dead (milliseconds).
* @param onStatusChange Optional callback fired when a node's status changes.
*/
void updateAllNodeStatuses(unsigned long currentTime,
unsigned long staleThresholdMs,
unsigned long deadThresholdMs,
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange = nullptr);
/**
* @brief Removes all dead members from the list.
*
* @return The number of members removed.
*/
size_t removeDeadMembers();
/**
* @brief Checks if a member exists in the list.
*
* @param ip The IP address of the member (as string).
* @return True if the member exists, false otherwise.
*/
bool hasMember(const std::string& ip) const;
/**
* @brief Clears all members from the list.
*/
void clear();
private:
std::map<std::string, NodeInfo> m_members; ///< Internal map holding the members.
};

View File

@@ -2,12 +2,14 @@
#include <WiFiUdp.h> #include <WiFiUdp.h>
#include <map> #include <map>
#include "spore/types/NodeInfo.h"
#include <functional> #include <functional>
#include <string> #include <string>
#include <initializer_list> #include <initializer_list>
#include <memory>
#include "spore/types/NodeInfo.h"
#include "spore/types/Config.h" #include "spore/types/Config.h"
#include "spore/types/ApiTypes.h" #include "spore/types/ApiTypes.h"
#include "spore/core/Memberlist.h"
class NodeContext { class NodeContext {
public: public:
@@ -18,7 +20,7 @@ public:
String hostname; String hostname;
IPAddress localIP; IPAddress localIP;
NodeInfo self; NodeInfo self;
std::map<String, NodeInfo>* memberList; std::unique_ptr<Memberlist> memberList;
::Config config; ::Config config;
std::map<String, String> constructorLabels; // Labels passed to constructor (not persisted) std::map<String, String> constructorLabels; // Labels passed to constructor (not persisted)

View File

@@ -5,11 +5,9 @@
// Cluster protocol and API constants // Cluster protocol and API constants
namespace ClusterProtocol { namespace ClusterProtocol {
constexpr const char* DISCOVERY_MSG = "CLUSTER_DISCOVERY"; constexpr const char* HEARTBEAT_MSG = "cluster/heartbeat";
constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE"; constexpr const char* NODE_UPDATE_MSG = "node/update";
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT"; constexpr const char* CLUSTER_EVENT_MSG = "cluster/event";
constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO";
constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT";
constexpr const char* RAW_MSG = "RAW"; constexpr const char* RAW_MSG = "RAW";
constexpr uint16_t UDP_PORT = 4210; constexpr uint16_t UDP_PORT = 4210;
// Increased buffer to accommodate larger RAW pixel streams and node info JSON over UDP // Increased buffer to accommodate larger RAW pixel streams and node info JSON over UDP
@@ -18,12 +16,9 @@ namespace ClusterProtocol {
} }
namespace TaskIntervals { namespace TaskIntervals {
constexpr unsigned long SEND_DISCOVERY = 1000;
constexpr unsigned long LISTEN_FOR_DISCOVERY = 100;
constexpr unsigned long UPDATE_STATUS = 1000; constexpr unsigned long UPDATE_STATUS = 1000;
constexpr unsigned long PRINT_MEMBER_LIST = 5000; constexpr unsigned long PRINT_MEMBER_LIST = 5000;
constexpr unsigned long HEARTBEAT = 2000; constexpr unsigned long HEARTBEAT = 2000;
constexpr unsigned long UPDATE_ALL_MEMBERS_INFO = 10000;
} }
constexpr unsigned long NODE_ACTIVE_THRESHOLD = 10000; constexpr unsigned long NODE_ACTIVE_THRESHOLD = 10000;

View File

@@ -1,11 +1,10 @@
#pragma once #pragma once
#include "spore/Service.h" #include "spore/Service.h"
#include "spore/util/CpuUsage.h"
#include <functional> #include <functional>
class MonitoringService : public Service { class MonitoringService : public Service {
public: public:
MonitoringService(CpuUsage& cpuUsage); MonitoringService();
void registerEndpoints(ApiServer& api) override; void registerEndpoints(ApiServer& api) override;
void registerTasks(TaskManager& taskManager) override; void registerTasks(TaskManager& taskManager) override;
const char* getName() const override { return "Monitoring"; } const char* getName() const override { return "Monitoring"; }
@@ -15,17 +14,12 @@ public:
// CPU information // CPU information
float currentCpuUsage; float currentCpuUsage;
float averageCpuUsage; float averageCpuUsage;
float maxCpuUsage;
float minCpuUsage;
unsigned long measurementCount; unsigned long measurementCount;
bool isMeasuring; bool isMeasuring;
// Memory information // Memory information
size_t freeHeap; size_t freeHeap;
size_t totalHeap; size_t totalHeap;
size_t minFreeHeap;
size_t maxAllocHeap;
size_t heapFragmentation;
// Filesystem information // Filesystem information
size_t totalBytes; size_t totalBytes;
@@ -45,8 +39,5 @@ private:
void handleResourcesRequest(AsyncWebServerRequest* request); void handleResourcesRequest(AsyncWebServerRequest* request);
// Helper methods // Helper methods
size_t calculateHeapFragmentation() const;
void getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const; void getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const;
CpuUsage& cpuUsage;
}; };

View File

@@ -12,11 +12,10 @@ public:
static constexpr const char* DEFAULT_WIFI_PASSWORD = "th3r31sn0sp00n"; static constexpr const char* DEFAULT_WIFI_PASSWORD = "th3r31sn0sp00n";
static constexpr uint16_t DEFAULT_UDP_PORT = 4210; static constexpr uint16_t DEFAULT_UDP_PORT = 4210;
static constexpr uint16_t DEFAULT_API_SERVER_PORT = 80; static constexpr uint16_t DEFAULT_API_SERVER_PORT = 80;
static constexpr unsigned long DEFAULT_DISCOVERY_INTERVAL_MS = 1000;
static constexpr unsigned long DEFAULT_CLUSTER_LISTEN_INTERVAL_MS = 10; static constexpr unsigned long DEFAULT_CLUSTER_LISTEN_INTERVAL_MS = 10;
static constexpr unsigned long DEFAULT_HEARTBEAT_INTERVAL_MS = 5000; static constexpr unsigned long DEFAULT_HEARTBEAT_INTERVAL_MS = 5000;
static constexpr unsigned long DEFAULT_STATUS_UPDATE_INTERVAL_MS = 1000; static constexpr unsigned long DEFAULT_STATUS_UPDATE_INTERVAL_MS = 1000;
static constexpr unsigned long DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS = 10000; static constexpr unsigned long DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS = 5000;
static constexpr unsigned long DEFAULT_PRINT_INTERVAL_MS = 5000; static constexpr unsigned long DEFAULT_PRINT_INTERVAL_MS = 5000;
static constexpr unsigned long DEFAULT_NODE_ACTIVE_THRESHOLD_MS = 10000; static constexpr unsigned long DEFAULT_NODE_ACTIVE_THRESHOLD_MS = 10000;
static constexpr unsigned long DEFAULT_NODE_INACTIVE_THRESHOLD_MS = 60000; static constexpr unsigned long DEFAULT_NODE_INACTIVE_THRESHOLD_MS = 60000;
@@ -38,12 +37,9 @@ public:
uint16_t api_server_port; uint16_t api_server_port;
// Cluster Configuration // Cluster Configuration
unsigned long discovery_interval_ms;
unsigned long heartbeat_interval_ms; unsigned long heartbeat_interval_ms;
unsigned long cluster_listen_interval_ms; unsigned long cluster_listen_interval_ms;
unsigned long status_update_interval_ms; unsigned long status_update_interval_ms;
unsigned long member_info_update_interval_ms;
unsigned long print_interval_ms;
// Node Status Thresholds // Node Status Thresholds
unsigned long node_active_threshold_ms; unsigned long node_active_threshold_ms;

View File

@@ -9,6 +9,7 @@ struct NodeInfo {
String hostname; String hostname;
IPAddress ip; IPAddress ip;
unsigned long lastSeen; unsigned long lastSeen;
unsigned long uptime = 0; // milliseconds since node started
enum Status { ACTIVE, INACTIVE, DEAD } status; enum Status { ACTIVE, INACTIVE, DEAD } status;
struct Resources { struct Resources {
uint32_t freeHeap = 0; uint32_t freeHeap = 0;
@@ -17,7 +18,7 @@ struct NodeInfo {
uint32_t cpuFreqMHz = 0; uint32_t cpuFreqMHz = 0;
uint32_t flashChipSize = 0; uint32_t flashChipSize = 0;
} resources; } resources;
unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt unsigned long latency = 0; // ms from heartbeat broadcast to NODE_UPDATE receipt
std::vector<EndpointInfo> endpoints; // List of registered endpoints std::vector<EndpointInfo> endpoints; // List of registered endpoints
std::map<String, String> labels; // Arbitrary node labels (key -> value) std::map<String, String> labels; // Arbitrary node labels (key -> value)
}; };

View File

@@ -1,118 +0,0 @@
#pragma once
#include <Arduino.h>
/**
* @brief CPU usage measurement utility for ESP32/ESP8266
*
* This class provides methods to measure CPU usage by tracking idle time
* and calculating the percentage of time the CPU is busy vs idle.
*/
class CpuUsage {
public:
/**
* @brief Construct a new CpuUsage object
*/
CpuUsage();
/**
* @brief Destructor
*/
~CpuUsage() = default;
/**
* @brief Initialize the CPU usage measurement
* Call this once during setup
*/
void begin();
/**
* @brief Start measuring CPU usage for the current cycle
* Call this at the beginning of your main loop
*/
void startMeasurement();
/**
* @brief End measuring CPU usage for the current cycle
* Call this at the end of your main loop
*/
void endMeasurement();
/**
* @brief Get the current CPU usage percentage
* @return float CPU usage percentage (0.0 to 100.0)
*/
float getCpuUsage() const;
/**
* @brief Get the average CPU usage over the measurement window
* @return float Average CPU usage percentage (0.0 to 100.0)
*/
float getAverageCpuUsage() const;
/**
* @brief Get the maximum CPU usage recorded
* @return float Maximum CPU usage percentage (0.0 to 100.0)
*/
float getMaxCpuUsage() const;
/**
* @brief Get the minimum CPU usage recorded
* @return float Minimum CPU usage percentage (0.0 to 100.0)
*/
float getMinCpuUsage() const;
/**
* @brief Reset all CPU usage statistics
*/
void reset();
/**
* @brief Check if measurement is currently active
* @return true if measurement is active, false otherwise
*/
bool isMeasuring() const;
/**
* @brief Get the number of measurements taken
* @return unsigned long Number of measurements
*/
unsigned long getMeasurementCount() const;
private:
// Measurement state
bool _initialized;
bool _measuring;
unsigned long _measurementCount;
// Timing variables
unsigned long _cycleStartTime;
unsigned long _idleStartTime;
unsigned long _totalIdleTime;
unsigned long _totalCycleTime;
// Statistics
float _currentCpuUsage;
float _averageCpuUsage;
float _maxCpuUsage;
float _minCpuUsage;
unsigned long _totalCpuTime;
// Rolling average window
static constexpr size_t ROLLING_WINDOW_SIZE = 10;
float _rollingWindow[ROLLING_WINDOW_SIZE];
size_t _rollingIndex;
bool _rollingWindowFull;
/**
* @brief Update rolling average calculation
* @param value New value to add to rolling average
*/
void updateRollingAverage(float value);
/**
* @brief Update min/max statistics
* @param value New value to check against min/max
*/
void updateMinMax(float value);
};

View File

@@ -18,6 +18,15 @@ monitor_speed = 115200
lib_deps = lib_deps =
esp32async/ESPAsyncWebServer@^3.8.0 esp32async/ESPAsyncWebServer@^3.8.0
bblanchon/ArduinoJson@^7.4.2 bblanchon/ArduinoJson@^7.4.2
build_flags =
-Os ; Optimize for size
-ffunction-sections ; Place each function in its own section
-fdata-sections ; Place data in separate sections
-Wl,--gc-sections ; Remove unused sections at link time
-DNDEBUG ; Disable debug assertions
-DVTABLES_IN_FLASH ; Move virtual tables to flash
-fno-exceptions ; Disable C++ exceptions
-fno-rtti ; Disable runtime type information
[env:base] [env:base]
platform = platformio/espressif8266@^4.2.1 platform = platformio/espressif8266@^4.2.1
@@ -31,6 +40,7 @@ board_build.filesystem = littlefs
; note: somehow partition table is not working, so we need to use the ldscript ; note: somehow partition table is not working, so we need to use the ldscript
board_build.ldscript = eagle.flash.1m64.ld ; 64KB -> FS Size board_build.ldscript = eagle.flash.1m64.ld ; 64KB -> FS Size
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/base/*.cpp> +<examples/base/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -51,6 +61,7 @@ board_build.flash_mode = dio ; D1 Mini uses DIO on 4 Mbit flash
board_build.flash_size = 4M board_build.flash_size = 4M
board_build.ldscript = eagle.flash.4m1m.ld board_build.ldscript = eagle.flash.4m1m.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/base/*.cpp> +<examples/base/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -71,6 +82,7 @@ board_build.flash_mode = dout
board_build.ldscript = eagle.flash.1m64.ld board_build.ldscript = eagle.flash.1m64.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
;data_dir = examples/relay/data ;data_dir = examples/relay/data
build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/relay/*.cpp> +<examples/relay/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -91,7 +103,7 @@ board_build.flash_mode = dout
board_build.ldscript = eagle.flash.1m64.ld board_build.ldscript = eagle.flash.1m64.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1 adafruit/Adafruit NeoPixel@^1.15.1
build_flags = -DLED_STRIP_PIN=2 build_flags = -DLED_STRIP_PIN=2 ;${common.build_flags}
build_src_filter = build_src_filter =
+<examples/neopattern/*.cpp> +<examples/neopattern/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -112,7 +124,7 @@ board_build.flash_mode = dout
board_build.ldscript = eagle.flash.1m64.ld board_build.ldscript = eagle.flash.1m64.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1 adafruit/Adafruit NeoPixel@^1.15.1
build_flags = build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/pixelstream/*.cpp> +<examples/pixelstream/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -133,7 +145,7 @@ board_build.flash_mode = dout
board_build.ldscript = eagle.flash.4m1m.ld board_build.ldscript = eagle.flash.4m1m.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1 adafruit/Adafruit NeoPixel@^1.15.1
build_flags = -DPIXEL_PIN=TX -DPIXEL_COUNT=256 -DMATRIX_WIDTH=16 build_flags = -DPIXEL_PIN=TX -DPIXEL_COUNT=256 -DMATRIX_WIDTH=16 ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/pixelstream/*.cpp> +<examples/pixelstream/*.cpp>
+<src/spore/*.cpp> +<src/spore/*.cpp>
@@ -156,6 +168,7 @@ board_build.ldscript = eagle.flash.4m1m.ld
lib_deps = ${common.lib_deps} lib_deps = ${common.lib_deps}
adafruit/Adafruit NeoPixel@^1.15.1 adafruit/Adafruit NeoPixel@^1.15.1
dfrobot/DFRobotDFPlayerMini@^1.0.6 dfrobot/DFRobotDFPlayerMini@^1.0.6
build_flags = ${common.build_flags}
build_src_filter = build_src_filter =
+<examples/multimatrix/*.cpp> +<examples/multimatrix/*.cpp>
+<examples/pixelstream/PixelStreamController.cpp> +<examples/pixelstream/PixelStreamController.cpp>

View File

@@ -9,7 +9,7 @@
Spore::Spore() : ctx(), network(ctx), taskManager(ctx), cluster(ctx, taskManager), Spore::Spore() : ctx(), network(ctx), taskManager(ctx), cluster(ctx, taskManager),
apiServer(ctx, taskManager, ctx.config.api_server_port), apiServer(ctx, taskManager, ctx.config.api_server_port),
cpuUsage(), initialized(false), apiServerStarted(false) { initialized(false), apiServerStarted(false) {
// Rebuild labels from constructor + config labels // Rebuild labels from constructor + config labels
ctx.rebuildLabels(); ctx.rebuildLabels();
@@ -18,7 +18,7 @@ Spore::Spore() : ctx(), network(ctx), taskManager(ctx), cluster(ctx, taskManager
Spore::Spore(std::initializer_list<std::pair<String, String>> initialLabels) Spore::Spore(std::initializer_list<std::pair<String, String>> initialLabels)
: ctx(initialLabels), network(ctx), taskManager(ctx), cluster(ctx, taskManager), : ctx(initialLabels), network(ctx), taskManager(ctx), cluster(ctx, taskManager),
apiServer(ctx, taskManager, ctx.config.api_server_port), apiServer(ctx, taskManager, ctx.config.api_server_port),
cpuUsage(), initialized(false), apiServerStarted(false) { initialized(false), apiServerStarted(false) {
// Rebuild labels from constructor + config labels (config takes precedence) // Rebuild labels from constructor + config labels (config takes precedence)
ctx.rebuildLabels(); ctx.rebuildLabels();
@@ -40,9 +40,6 @@ void Spore::setup() {
// Initialize core components // Initialize core components
initializeCore(); initializeCore();
// Initialize CPU usage monitoring
cpuUsage.begin();
// Register core services // Register core services
registerCoreServices(); registerCoreServices();
@@ -78,15 +75,9 @@ void Spore::loop() {
return; return;
} }
// Start CPU usage measurement
cpuUsage.startMeasurement();
// Execute main tasks // Execute main tasks
taskManager.execute(); taskManager.execute();
// End CPU usage measurement before yield
cpuUsage.endMeasurement();
// Yield to allow other tasks to run // Yield to allow other tasks to run
yield(); yield();
} }
@@ -141,7 +132,7 @@ void Spore::registerCoreServices() {
auto clusterService = std::make_shared<ClusterService>(ctx); auto clusterService = std::make_shared<ClusterService>(ctx);
auto taskService = std::make_shared<TaskService>(taskManager); auto taskService = std::make_shared<TaskService>(taskManager);
auto staticFileService = std::make_shared<StaticFileService>(ctx, apiServer); auto staticFileService = std::make_shared<StaticFileService>(ctx, apiServer);
auto monitoringService = std::make_shared<MonitoringService>(cpuUsage); auto monitoringService = std::make_shared<MonitoringService>();
// Add to services list // Add to services list
services.push_back(nodeService); services.push_back(nodeService);

View File

@@ -23,11 +23,12 @@ void ApiServer::registerEndpoint(const String& uri, int method,
endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true}); endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
// Update cluster if needed // Update cluster if needed
if (ctx.memberList && !ctx.memberList->empty()) { String localIPStr = ctx.localIP.toString();
auto it = ctx.memberList->find(ctx.hostname); auto member = ctx.memberList->getMember(localIPStr.c_str());
if (it != ctx.memberList->end()) { if (member) {
it->second.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true}); NodeInfo updatedNode = *member;
} updatedNode.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
} }
} }

View File

@@ -1,14 +1,15 @@
#include "spore/core/ClusterManager.h" #include "spore/core/ClusterManager.h"
#include "spore/internal/Globals.h" #include "spore/internal/Globals.h"
#include "spore/util/Logging.h" #include "spore/util/Logging.h"
#include "spore/types/NodeInfo.h"
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) { ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
// Register callback for node_discovered event // Register callback for node/discovered event - this fires when network is ready
ctx.on("node_discovered", [this](void* data) { ctx.on("node/discovered", [this](void* data) {
NodeInfo* node = static_cast<NodeInfo*>(data); NodeInfo* node = static_cast<NodeInfo*>(data);
this->addOrUpdateNode(node->hostname, node->ip); this->addOrUpdateNode(node->hostname, node->ip);
}); });
// Centralized broadcast handler: services fire 'cluster/broadcast' with CLUSTER_EVENT JSON payload // Centralized broadcast handler: services fire 'cluster/broadcast' with cluster/event JSON payload
ctx.on("cluster/broadcast", [this](void* data) { ctx.on("cluster/broadcast", [this](void* data) {
String* jsonStr = static_cast<String*>(data); String* jsonStr = static_cast<String*>(data);
if (!jsonStr) { if (!jsonStr) {
@@ -19,33 +20,36 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
IPAddress ip = WiFi.localIP(); IPAddress ip = WiFi.localIP();
IPAddress mask = WiFi.subnetMask(); IPAddress mask = WiFi.subnetMask();
IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]); IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]);
LOG_DEBUG("Cluster", String("Broadcasting CLUSTER_EVENT to ") + bcast.toString() + " len=" + String(jsonStr->length())); LOG_DEBUG("Cluster", String("Broadcasting cluster/event to ") + bcast.toString() + " len=" + String(jsonStr->length()));
this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port); this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port);
String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr; String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr;
this->ctx.udp->write(msg.c_str()); this->ctx.udp->write(msg.c_str());
this->ctx.udp->endPacket(); this->ctx.udp->endPacket();
}); });
// Handler for node update broadcasts: services fire 'cluster/node/update' when their node info changes
ctx.on("cluster/node/update", [this](void* data) {
// Trigger immediate NODE_UPDATE broadcast when node info changes
broadcastNodeUpdate();
});
// Handler for memberlist changes: print memberlist when it changes
ctx.on("cluster/memberlist/changed", [this](void* data) {
printMemberList();
});
// Register tasks // Register tasks
registerTasks(); registerTasks();
initMessageHandlers(); initMessageHandlers();
} }
void ClusterManager::registerTasks() { void ClusterManager::registerTasks() {
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); }); taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
LOG_INFO("ClusterManager", "Registered all cluster tasks"); LOG_INFO("ClusterManager", "Registered all cluster tasks");
} }
void ClusterManager::sendDiscovery() { // Discovery functionality removed - using heartbeat-only approach
//LOG_DEBUG(ctx, "Cluster", "Sending discovery packet...");
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
ctx.udp->write(ClusterProtocol::DISCOVERY_MSG);
ctx.udp->endPacket();
}
void ClusterManager::listen() { void ClusterManager::listen() {
int packetSize = ctx.udp->parsePacket(); int packetSize = ctx.udp->parsePacket();
@@ -69,10 +73,8 @@ void ClusterManager::listen() {
void ClusterManager::initMessageHandlers() { void ClusterManager::initMessageHandlers() {
messageHandlers.clear(); messageHandlers.clear();
messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" }); messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" });
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" }); messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" }); messageHandlers.push_back({ &ClusterManager::isNodeUpdateMsg, [this](const char* msg){ this->onNodeUpdate(msg); }, "NODE_UPDATE" });
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" }); messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" });
} }
@@ -94,20 +96,12 @@ void ClusterManager::handleIncomingMessage(const char* incoming) {
LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head); LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head);
} }
bool ClusterManager::isDiscoveryMsg(const char* msg) {
return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
}
bool ClusterManager::isHeartbeatMsg(const char* msg) { bool ClusterManager::isHeartbeatMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0; return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
} }
bool ClusterManager::isResponseMsg(const char* msg) { bool ClusterManager::isNodeUpdateMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0; return strncmp(msg, ClusterProtocol::NODE_UPDATE_MSG, strlen(ClusterProtocol::NODE_UPDATE_MSG)) == 0;
}
bool ClusterManager::isNodeInfoMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
} }
bool ClusterManager::isClusterEventMsg(const char* msg) { bool ClusterManager::isClusterEventMsg(const char* msg) {
@@ -123,101 +117,186 @@ bool ClusterManager::isRawMsg(const char* msg) {
return msg[prefixLen] == ':'; return msg[prefixLen] == ':';
} }
void ClusterManager::onDiscovery(const char* /*msg*/) { // Discovery functionality removed - using heartbeat-only approach
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; void ClusterManager::onHeartbeat(const char* msg) {
ctx.udp->write(response.c_str()); // Extract hostname from heartbeat message: "cluster/heartbeat:hostname"
ctx.udp->endPacket(); const char* colon = strchr(msg, ':');
if (!colon) {
LOG_WARN("Cluster", "Invalid heartbeat message format");
return;
}
String hostname = String(colon + 1);
IPAddress senderIP = ctx.udp->remoteIP();
// Update memberlist with the heartbeat
addOrUpdateNode(hostname, senderIP);
// Respond with minimal node info (hostname, ip, uptime, labels)
sendNodeInfo(hostname, senderIP);
} }
void ClusterManager::onHeartbeat(const char* /*msg*/) { void ClusterManager::onNodeUpdate(const char* msg) {
// Message format: "node/update:hostname:{json}"
const char* firstColon = strchr(msg, ':');
if (!firstColon) {
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
return;
}
const char* secondColon = strchr(firstColon + 1, ':');
if (!secondColon) {
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
return;
}
String hostnamePart = String(firstColon + 1);
String hostname = hostnamePart.substring(0, secondColon - firstColon - 1);
const char* jsonCStr = secondColon + 1;
JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonCStr);
if (err) {
LOG_WARN("Cluster", String("Failed to parse NODE_UPDATE JSON from ") + ctx.udp->remoteIP().toString());
return;
}
// The NODE_UPDATE contains info about the target node (hostname from message)
// but is sent FROM the responding node (ctx.udp->remoteIP())
// We need to find the responding node in the memberlist, not the target node
IPAddress respondingNodeIP = ctx.udp->remoteIP();
String respondingIPStr = respondingNodeIP.toString();
// Find the responding node by IP address
auto respondingMember = ctx.memberList->getMember(respondingIPStr.c_str());
if (!respondingMember) {
LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString());
return;
}
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
unsigned long latency = 0;
unsigned long now = millis();
if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window
latency = now - lastHeartbeatSentAt;
lastHeartbeatSentAt = 0; // Reset for next calculation
}
// Create updated node info
NodeInfo updatedNode = *respondingMember;
bool hostnameChanged = false;
bool labelsChanged = false;
// Update hostname if provided
if (doc["hostname"].is<const char*>()) {
String newHostname = doc["hostname"].as<const char*>();
if (updatedNode.hostname != newHostname) {
updatedNode.hostname = newHostname;
hostnameChanged = true;
}
}
// Update uptime if provided
if (doc["uptime"].is<unsigned long>()) {
updatedNode.uptime = doc["uptime"];
}
// Update labels if provided
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
std::map<String, String> newLabels;
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
newLabels[key] = String(value);
}
// Compare with existing labels
if (newLabels != updatedNode.labels) {
labelsChanged = true;
updatedNode.labels = newLabels;
}
}
// Update timing and status
updatedNode.lastSeen = now;
updatedNode.status = NodeInfo::ACTIVE;
// Update latency if we calculated it (preserve existing value if not)
if (latency > 0) {
updatedNode.latency = latency;
}
// Persist the updated node info to the memberlist
ctx.memberList->updateMember(respondingIPStr.c_str(), updatedNode);
// Check if any fields changed that require broadcasting
bool nodeInfoChanged = hostnameChanged || labelsChanged;
if (nodeInfoChanged) {
// Fire cluster/node/update event to trigger broadcast
ctx.fire("cluster/node/update", nullptr);
}
LOG_DEBUG("Cluster", String("Updated responding node ") + updatedNode.hostname + " @ " + respondingNodeIP.toString() +
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
}
void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) {
JsonDocument doc; JsonDocument doc;
if (ctx.memberList) { // Get our node info for the response (we're the responding node)
auto it = ctx.memberList->find(ctx.hostname); String localIPStr = ctx.localIP.toString();
if (it != ctx.memberList->end()) { auto member = ctx.memberList->getMember(localIPStr.c_str());
if (member) {
const NodeInfo& node = *member;
// Response contains info about ourselves (the responding node)
doc["hostname"] = node.hostname;
doc["ip"] = node.ip.toString();
doc["uptime"] = node.uptime;
// Add labels if present
if (!node.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>(); JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : it->second.labels) { for (const auto& kv : node.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
} else if (!ctx.self.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : ctx.self.labels) {
labelsObj[kv.first.c_str()] = kv.second; labelsObj[kv.first.c_str()] = kv.second;
} }
} }
} else {
// Fallback to basic info if not in memberlist
doc["hostname"] = ctx.hostname;
doc["ip"] = ctx.localIP.toString();
doc["uptime"] = millis();
} }
String json; String json;
serializeJson(doc, json); serializeJson(doc, json);
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); // Send NODE_UPDATE:targetHostname:{json about responding node}
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json; ctx.udp->beginPacket(targetIP, ctx.config.udp_port);
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + targetHostname + ":" + json;
ctx.udp->write(msg.c_str()); ctx.udp->write(msg.c_str());
ctx.udp->endPacket(); ctx.udp->endPacket();
}
void ClusterManager::onResponse(const char* msg) { LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + targetHostname + " @ " + targetIP.toString());
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
String nodeHost = String(hostPtr);
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
}
void ClusterManager::onNodeInfo(const char* msg) {
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
char* hostEnd = strchr(p, ':');
if (hostEnd) {
*hostEnd = '\0';
const char* hostCStr = p;
const char* jsonCStr = hostEnd + 1;
String nodeHost = String(hostCStr);
IPAddress senderIP = ctx.udp->remoteIP();
addOrUpdateNode(nodeHost, senderIP);
JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonCStr);
if (!err) {
auto& memberList = *ctx.memberList;
auto it = memberList.find(nodeHost);
if (it != memberList.end()) {
NodeInfo& node = it->second;
node.status = NodeInfo::ACTIVE;
unsigned long now = millis();
node.lastSeen = now;
if (lastHeartbeatSentAt != 0) {
node.latency = now - lastHeartbeatSentAt;
}
node.labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) {
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
node.labels[key] = value;
}
}
}
} else {
LOG_WARN("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
}
}
} }
void ClusterManager::onClusterEvent(const char* msg) { void ClusterManager::onClusterEvent(const char* msg) {
// Message format: CLUSTER_EVENT:{"event":"...","data":"<json string>"} // Message format: cluster/event:{"event":"...","data":"<json string>"}
const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':' const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':'
if (*jsonStart == '\0') { if (*jsonStart == '\0') {
LOG_DEBUG("Cluster", "CLUSTER_EVENT received with empty payload"); LOG_DEBUG("Cluster", "cluster/event received with empty payload");
return; return;
} }
LOG_DEBUG("Cluster", String("CLUSTER_EVENT raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart))); LOG_DEBUG("Cluster", String("cluster/event raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart)));
JsonDocument doc; JsonDocument doc;
DeserializationError err = deserializeJson(doc, jsonStart); DeserializationError err = deserializeJson(doc, jsonStart);
if (err) { if (err) {
LOG_ERROR("Cluster", String("Failed to parse CLUSTER_EVENT JSON from ") + ctx.udp->remoteIP().toString()); LOG_ERROR("Cluster", String("Failed to parse cluster/event JSON from ") + ctx.udp->remoteIP().toString());
return; return;
} }
// Robust extraction of event and data // Robust extraction of event and data
@@ -241,7 +320,7 @@ void ClusterManager::onClusterEvent(const char* msg) {
if (eventStr.length() == 0 || data.length() == 0) { if (eventStr.length() == 0 || data.length() == 0) {
String dbg; String dbg;
serializeJson(doc, dbg); serializeJson(doc, dbg);
LOG_WARN("Cluster", String("CLUSTER_EVENT missing 'event' or 'data' | payload=") + dbg); LOG_WARN("Cluster", String("cluster/event missing 'event' or 'data' | payload=") + dbg);
return; return;
} }
@@ -268,154 +347,67 @@ void ClusterManager::onRawMessage(const char* msg) {
} }
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
auto& memberList = *ctx.memberList; bool memberlistChanged = false;
String ipStr = nodeIP.toString();
// O(1) lookup instead of O(n) search // Check if member exists
auto it = memberList.find(nodeHost); auto existingMember = ctx.memberList->getMember(ipStr.c_str());
if (it != memberList.end()) { if (existingMember) {
// Update existing node // Update existing node - preserve all existing field values
it->second.ip = nodeIP; NodeInfo updatedNode = *existingMember;
it->second.lastSeen = millis(); if (updatedNode.ip != nodeIP) {
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task updatedNode.ip = nodeIP;
return; memberlistChanged = true;
} }
updatedNode.lastSeen = millis();
ctx.memberList->updateMember(ipStr.c_str(), updatedNode);
} else {
// Add new node // Add new node
NodeInfo newNode; NodeInfo newNode;
newNode.hostname = nodeHost; newNode.hostname = nodeHost;
newNode.ip = nodeIP; newNode.ip = nodeIP;
newNode.lastSeen = millis(); newNode.lastSeen = millis();
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
memberList[nodeHost] = newNode;
// Initialize static resources if this is the local node being added for the first time
if (nodeIP == ctx.localIP && nodeHost == ctx.hostname) {
newNode.resources.chipId = ESP.getChipId();
newNode.resources.sdkVersion = String(ESP.getSdkVersion());
newNode.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
newNode.resources.flashChipSize = ESP.getFlashChipSize();
LOG_DEBUG("Cluster", "Initialized static resources for local node");
}
ctx.memberList->addMember(ipStr.c_str(), newNode);
memberlistChanged = true;
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
}
// unused http client to fetch complete node info
void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
if(ip == ctx.localIP) {
LOG_DEBUG("Cluster", "Skipping fetch for local node");
return;
} }
unsigned long requestStart = millis(); // Fire event if memberlist changed
HTTPClient http; if (memberlistChanged) {
WiFiClient client; ctx.fire("cluster/memberlist/changed", nullptr);
String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS;
// Use RAII pattern to ensure http.end() is always called
bool httpInitialized = false;
bool success = false;
httpInitialized = http.begin(client, url);
if (!httpInitialized) {
LOG_ERROR("Cluster", "Failed to initialize HTTP client for " + ip.toString());
return;
}
// Set timeout to prevent hanging
http.setTimeout(5000); // 5 second timeout
int httpCode = http.GET();
unsigned long requestEnd = millis();
unsigned long requestDuration = requestEnd - requestStart;
if (httpCode == 200) {
String payload = http.getString();
// Use stack-allocated JsonDocument with proper cleanup
JsonDocument doc;
DeserializationError err = deserializeJson(doc, payload);
if (!err) {
auto& memberList = *ctx.memberList;
// Still need to iterate since we're searching by IP, not hostname
for (auto& pair : memberList) {
NodeInfo& node = pair.second;
if (node.ip == ip) {
// Update resources efficiently
node.resources.freeHeap = doc["freeHeap"];
node.resources.chipId = doc["chipId"];
node.resources.sdkVersion = (const char*)doc["sdkVersion"];
node.resources.cpuFreqMHz = doc["cpuFreqMHz"];
node.resources.flashChipSize = doc["flashChipSize"];
node.status = NodeInfo::ACTIVE;
node.latency = requestDuration;
node.lastSeen = millis();
// Clear and rebuild endpoints efficiently
node.endpoints.clear();
node.endpoints.reserve(10); // Pre-allocate to avoid reallocations
if (doc["api"].is<JsonArray>()) {
JsonArray apiArr = doc["api"].as<JsonArray>();
for (JsonObject apiObj : apiArr) {
// Use const char* to avoid String copies
const char* uri = apiObj["uri"];
int method = apiObj["method"];
// Create basic EndpointInfo without params for cluster nodes
EndpointInfo endpoint;
endpoint.uri = uri; // String assignment is more efficient than construction
endpoint.method = method;
endpoint.isLocal = false;
endpoint.serviceName = "remote";
node.endpoints.push_back(std::move(endpoint));
}
}
// Parse labels efficiently
node.labels.clear();
if (doc["labels"].is<JsonObject>()) {
JsonObject labelsObj = doc["labels"].as<JsonObject>();
for (JsonPair kvp : labelsObj) {
// Use const char* to avoid String copies
const char* key = kvp.key().c_str();
const char* value = labelsObj[kvp.key()];
node.labels[key] = value;
}
}
LOG_DEBUG("Cluster", "Fetched info for node: " + node.hostname + " @ " + ip.toString());
success = true;
break;
}
}
} else {
LOG_ERROR("Cluster", "JSON parse error for node @ " + ip.toString() + ": " + String(err.c_str()));
}
} else {
LOG_ERROR("Cluster", "Failed to fetch info for node @ " + ip.toString() + ", HTTP code: " + String(httpCode));
}
// Always ensure HTTP client is properly closed
if (httpInitialized) {
http.end();
}
// Log success/failure for debugging
if (!success) {
LOG_DEBUG("Cluster", "Failed to update node info for " + ip.toString());
} }
} }
void ClusterManager::heartbeatTaskCallback() { void ClusterManager::heartbeatTaskCallback() {
auto& memberList = *ctx.memberList; // Update local node resources and lastSeen since we're actively sending heartbeats
auto it = memberList.find(ctx.hostname); String localIPStr = ctx.localIP.toString();
if (it != memberList.end()) { auto member = ctx.memberList->getMember(localIPStr.c_str());
NodeInfo& node = it->second; if (member) {
node.lastSeen = millis(); NodeInfo node = *member;
node.status = NodeInfo::ACTIVE; updateLocalNodeResources(node);
updateLocalNodeResources(); node.lastSeen = millis(); // Update lastSeen since we're actively participating
addOrUpdateNode(ctx.hostname, ctx.localIP); ctx.memberList->updateMember(localIPStr.c_str(), node);
} }
// Broadcast heartbeat so peers can respond with their node info // Broadcast heartbeat - peers will respond with NODE_UPDATE
lastHeartbeatSentAt = millis(); lastHeartbeatSentAt = millis();
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname; String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
ctx.udp->write(hb.c_str()); ctx.udp->write(hb.c_str());
ctx.udp->endPacket(); ctx.udp->endPacket();
LOG_DEBUG("Cluster", String("Sent heartbeat: ") + ctx.hostname);
} }
void ClusterManager::updateAllMembersInfoTaskCallback() { void ClusterManager::updateAllMembersInfoTaskCallback() {
@@ -423,55 +415,74 @@ void ClusterManager::updateAllMembersInfoTaskCallback() {
// No-op to reduce network and memory usage // No-op to reduce network and memory usage
} }
void ClusterManager::updateAllNodeStatuses() { void ClusterManager::broadcastNodeUpdate() {
auto& memberList = *ctx.memberList; // Broadcast our current node info as NODE_UPDATE to all cluster members
unsigned long now = millis(); String localIPStr = ctx.localIP.toString();
for (auto& pair : memberList) { auto member = ctx.memberList->getMember(localIPStr.c_str());
NodeInfo& node = pair.second; if (!member) {
updateNodeStatus(node, now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); return;
} }
const NodeInfo& node = *member;
JsonDocument doc;
doc["hostname"] = node.hostname;
doc["uptime"] = node.uptime;
// Add labels if present
if (!node.labels.empty()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : node.labels) {
labelsObj[kv.first.c_str()] = kv.second;
}
}
String json;
serializeJson(doc, json);
// Broadcast to all cluster members
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + ctx.hostname + ":" + json;
ctx.udp->write(msg.c_str());
ctx.udp->endPacket();
LOG_DEBUG("Cluster", String("Broadcasted NODE_UPDATE for ") + ctx.hostname);
}
void ClusterManager::updateAllNodeStatuses() {
unsigned long now = millis();
ctx.memberList->updateAllNodeStatuses(now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
} }
void ClusterManager::removeDeadNodes() { void ClusterManager::removeDeadNodes() {
auto& memberList = *ctx.memberList; size_t removedCount = ctx.memberList->removeDeadMembers();
unsigned long now = millis(); if (removedCount > 0) {
LOG_INFO("Cluster", String("Removed ") + removedCount + " dead nodes");
// Use iterator to safely remove elements from map ctx.fire("cluster/memberlist/changed", nullptr);
for (auto it = memberList.begin(); it != memberList.end(); ) {
unsigned long diff = now - it->second.lastSeen;
if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) {
LOG_INFO("Cluster", "Removing node: " + it->second.hostname);
it = memberList.erase(it);
} else {
++it;
}
} }
} }
void ClusterManager::printMemberList() { void ClusterManager::printMemberList() {
auto& memberList = *ctx.memberList; size_t count = ctx.memberList->getMemberCount();
if (memberList.empty()) { if (count == 0) {
LOG_INFO("Cluster", "Member List: empty"); LOG_INFO("Cluster", "Member List: empty");
return; return;
} }
LOG_INFO("Cluster", "Member List:"); LOG_INFO("Cluster", "Member List:");
for (const auto& pair : memberList) { ctx.memberList->forEachMember([](const std::string& ip, const NodeInfo& node) {
const NodeInfo& node = pair.second;
LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen)); LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen));
} });
} }
void ClusterManager::updateLocalNodeResources() { void ClusterManager::updateLocalNodeResources(NodeInfo& node) {
auto& memberList = *ctx.memberList; // Update node status and timing
auto it = memberList.find(ctx.hostname); node.lastSeen = millis();
if (it != memberList.end()) { node.status = NodeInfo::ACTIVE;
NodeInfo& node = it->second; node.uptime = millis();
// Update dynamic resources (always updated)
uint32_t freeHeap = ESP.getFreeHeap(); uint32_t freeHeap = ESP.getFreeHeap();
node.resources.freeHeap = freeHeap; node.resources.freeHeap = freeHeap;
node.resources.chipId = ESP.getChipId();
node.resources.sdkVersion = String(ESP.getSdkVersion());
node.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
node.resources.flashChipSize = ESP.getFlashChipSize();
// Log memory warnings if heap is getting low // Log memory warnings if heap is getting low
if (freeHeap < ctx.config.low_memory_threshold_bytes) { if (freeHeap < ctx.config.low_memory_threshold_bytes) {
@@ -479,5 +490,4 @@ void ClusterManager::updateLocalNodeResources() {
} else if (freeHeap < ctx.config.critical_memory_threshold_bytes) { } else if (freeHeap < ctx.config.critical_memory_threshold_bytes) {
LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free"); LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free");
} }
}
} }

View File

@@ -0,0 +1,114 @@
#include "spore/core/Memberlist.h"
#include <algorithm>
Memberlist::Memberlist() = default;
Memberlist::~Memberlist() = default;
bool Memberlist::addOrUpdateMember(const std::string& ip, const NodeInfo& node) {
auto it = m_members.find(ip);
if (it != m_members.end()) {
// Update existing member
it->second = node;
it->second.lastSeen = millis(); // Update last seen time
return true;
} else {
// Add new member
NodeInfo newNode = node;
newNode.lastSeen = millis();
m_members[ip] = newNode;
return true;
}
}
bool Memberlist::addMember(const std::string& ip, const NodeInfo& node) {
if (m_members.find(ip) != m_members.end()) {
return false; // Member already exists
}
NodeInfo newNode = node;
newNode.lastSeen = millis();
m_members[ip] = newNode;
return true;
}
bool Memberlist::updateMember(const std::string& ip, const NodeInfo& node) {
auto it = m_members.find(ip);
if (it == m_members.end()) {
return false; // Member doesn't exist
}
it->second = node;
it->second.lastSeen = millis(); // Update last seen time
return true;
}
bool Memberlist::removeMember(const std::string& ip) {
auto it = m_members.find(ip);
if (it == m_members.end()) {
return false; // Member doesn't exist
}
m_members.erase(it);
return true;
}
std::optional<NodeInfo> Memberlist::getMember(const std::string& ip) const {
auto it = m_members.find(ip);
if (it != m_members.end()) {
return it->second;
}
return std::nullopt;
}
void Memberlist::forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const {
for (const auto& pair : m_members) {
callback(pair.first, pair.second);
}
}
bool Memberlist::forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const {
for (const auto& pair : m_members) {
if (!callback(pair.first, pair.second)) {
return false;
}
}
return true;
}
size_t Memberlist::getMemberCount() const {
return m_members.size();
}
void Memberlist::updateAllNodeStatuses(unsigned long currentTime,
unsigned long staleThresholdMs,
unsigned long deadThresholdMs,
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange) {
for (auto& [ip, node] : m_members) {
NodeInfo::Status oldStatus = node.status;
updateNodeStatus(node, currentTime, staleThresholdMs, deadThresholdMs);
if (oldStatus != node.status && onStatusChange) {
onStatusChange(ip, oldStatus, node.status);
}
}
}
size_t Memberlist::removeDeadMembers() {
size_t removedCount = 0;
auto it = m_members.begin();
while (it != m_members.end()) {
if (it->second.status == NodeInfo::Status::DEAD) {
it = m_members.erase(it);
++removedCount;
} else {
++it;
}
}
return removedCount;
}
bool Memberlist::hasMember(const std::string& ip) const {
return m_members.find(ip) != m_members.end();
}
void Memberlist::clear() {
m_members.clear();
}

View File

@@ -119,14 +119,9 @@ void NetworkManager::setupWiFi() {
ctx.self.status = NodeInfo::ACTIVE; ctx.self.status = NodeInfo::ACTIVE;
// Ensure member list has an entry for this node // Ensure member list has an entry for this node
auto &memberList = *ctx.memberList; String localIPStr = ctx.localIP.toString();
auto existing = memberList.find(ctx.hostname); ctx.memberList->addOrUpdateMember(localIPStr.c_str(), ctx.self);
if (existing == memberList.end()) {
memberList[ctx.hostname] = ctx.self;
} else {
existing->second = ctx.self;
}
// Notify listeners that the node is (re)discovered // Notify listeners that the node is (re)discovered
ctx.fire("node_discovered", &ctx.self); ctx.fire("node/discovered", &ctx.self);
} }

View File

@@ -2,7 +2,7 @@
NodeContext::NodeContext() { NodeContext::NodeContext() {
udp = new WiFiUDP(); udp = new WiFiUDP();
memberList = new std::map<String, NodeInfo>(); memberList = std::make_unique<Memberlist>();
hostname = ""; hostname = "";
self.hostname = ""; self.hostname = "";
self.ip = IPAddress(); self.ip = IPAddress();
@@ -19,7 +19,7 @@ NodeContext::NodeContext(std::initializer_list<std::pair<String, String>> initia
NodeContext::~NodeContext() { NodeContext::~NodeContext() {
delete udp; delete udp;
delete memberList; // memberList is a unique_ptr, so no need to delete manually
} }
void NodeContext::on(const std::string& event, EventCallback cb) { void NodeContext::on(const std::string& event, EventCallback cb) {

View File

@@ -40,8 +40,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
JsonDocument doc; JsonDocument doc;
JsonArray arr = doc["members"].to<JsonArray>(); JsonArray arr = doc["members"].to<JsonArray>();
for (const auto& pair : *ctx.memberList) { ctx.memberList->forEachMember([&arr](const std::string& ip, const NodeInfo& node) {
const NodeInfo& node = pair.second;
JsonObject obj = arr.add<JsonObject>(); JsonObject obj = arr.add<JsonObject>();
obj["hostname"] = node.hostname; obj["hostname"] = node.hostname;
obj["ip"] = node.ip.toString(); obj["ip"] = node.ip.toString();
@@ -56,7 +55,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
labelsObj[kv.first.c_str()] = kv.second; labelsObj[kv.first.c_str()] = kv.second;
} }
} }
} });
String json; String json;
serializeJson(doc, json); serializeJson(doc, json);

View File

@@ -5,8 +5,7 @@
#include <FS.h> #include <FS.h>
#include <LittleFS.h> #include <LittleFS.h>
MonitoringService::MonitoringService(CpuUsage& cpuUsage) MonitoringService::MonitoringService() {
: cpuUsage(cpuUsage) {
} }
void MonitoringService::registerEndpoints(ApiServer& api) { void MonitoringService::registerEndpoints(ApiServer& api) {
@@ -22,20 +21,15 @@ void MonitoringService::registerTasks(TaskManager& taskManager) {
MonitoringService::SystemResources MonitoringService::getSystemResources() const { MonitoringService::SystemResources MonitoringService::getSystemResources() const {
SystemResources resources; SystemResources resources;
// CPU information // CPU information - sending fixed value of 100
resources.currentCpuUsage = cpuUsage.getCpuUsage(); resources.currentCpuUsage = 100.0f;
resources.averageCpuUsage = cpuUsage.getAverageCpuUsage(); resources.averageCpuUsage = 100.0f;
resources.maxCpuUsage = cpuUsage.getMaxCpuUsage(); resources.measurementCount = 0;
resources.minCpuUsage = cpuUsage.getMinCpuUsage(); resources.isMeasuring = false;
resources.measurementCount = cpuUsage.getMeasurementCount();
resources.isMeasuring = cpuUsage.isMeasuring();
// Memory information - ESP8266 compatible // Memory information - ESP8266 compatible
resources.freeHeap = ESP.getFreeHeap(); resources.freeHeap = ESP.getFreeHeap();
resources.totalHeap = 81920; // ESP8266 has ~80KB RAM resources.totalHeap = 81920; // ESP8266 has ~80KB RAM
resources.minFreeHeap = 0; // Not available on ESP8266
resources.maxAllocHeap = 0; // Not available on ESP8266
resources.heapFragmentation = calculateHeapFragmentation();
// Filesystem information // Filesystem information
getFilesystemInfo(resources.totalBytes, resources.usedBytes); getFilesystemInfo(resources.totalBytes, resources.usedBytes);
@@ -59,8 +53,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
JsonObject cpu = doc["cpu"].to<JsonObject>(); JsonObject cpu = doc["cpu"].to<JsonObject>();
cpu["current_usage"] = resources.currentCpuUsage; cpu["current_usage"] = resources.currentCpuUsage;
cpu["average_usage"] = resources.averageCpuUsage; cpu["average_usage"] = resources.averageCpuUsage;
cpu["max_usage"] = resources.maxCpuUsage;
cpu["min_usage"] = resources.minCpuUsage;
cpu["measurement_count"] = resources.measurementCount; cpu["measurement_count"] = resources.measurementCount;
cpu["is_measuring"] = resources.isMeasuring; cpu["is_measuring"] = resources.isMeasuring;
@@ -68,9 +60,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
JsonObject memory = doc["memory"].to<JsonObject>(); JsonObject memory = doc["memory"].to<JsonObject>();
memory["free_heap"] = resources.freeHeap; memory["free_heap"] = resources.freeHeap;
memory["total_heap"] = resources.totalHeap; memory["total_heap"] = resources.totalHeap;
memory["min_free_heap"] = resources.minFreeHeap;
memory["max_alloc_heap"] = resources.maxAllocHeap;
memory["heap_fragmentation"] = resources.heapFragmentation;
memory["heap_usage_percent"] = resources.totalHeap > 0 ? memory["heap_usage_percent"] = resources.totalHeap > 0 ?
(float)(resources.totalHeap - resources.freeHeap) / (float)resources.totalHeap * 100.0f : 0.0f; (float)(resources.totalHeap - resources.freeHeap) / (float)resources.totalHeap * 100.0f : 0.0f;
@@ -94,15 +83,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
request->send(200, "application/json", json); request->send(200, "application/json", json);
} }
size_t MonitoringService::calculateHeapFragmentation() const {
size_t freeHeap = ESP.getFreeHeap();
size_t maxAllocHeap = 0; // Not available on ESP8266
if (maxAllocHeap == 0) return 0;
// Calculate fragmentation as percentage of free heap that can't be allocated in one block
return (freeHeap - maxAllocHeap) * 100 / freeHeap;
}
void MonitoringService::getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const { void MonitoringService::getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const {
totalBytes = 0; totalBytes = 0;

View File

@@ -74,11 +74,10 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
doc["flashChipSize"] = ESP.getFlashChipSize(); doc["flashChipSize"] = ESP.getFlashChipSize();
// Include local node labels if present // Include local node labels if present
if (ctx.memberList) { auto member = ctx.memberList->getMember(ctx.hostname.c_str());
auto it = ctx.memberList->find(ctx.hostname); if (member) {
if (it != ctx.memberList->end()) {
JsonObject labelsObj = doc["labels"].to<JsonObject>(); JsonObject labelsObj = doc["labels"].to<JsonObject>();
for (const auto& kv : it->second.labels) { for (const auto& kv : member->labels) {
labelsObj[kv.first.c_str()] = kv.second; labelsObj[kv.first.c_str()] = kv.second;
} }
} else if (!ctx.self.labels.empty()) { } else if (!ctx.self.labels.empty()) {
@@ -87,7 +86,6 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
labelsObj[kv.first.c_str()] = kv.second; labelsObj[kv.first.c_str()] = kv.second;
} }
} }
}
String json; String json;
serializeJson(doc, json); serializeJson(doc, json);
@@ -116,7 +114,7 @@ void NodeService::handleUpdateUpload(AsyncWebServerRequest* request, const Strin
LOG_ERROR("OTA", "Update failed: not enough space"); LOG_ERROR("OTA", "Update failed: not enough space");
Update.printError(Serial); Update.printError(Serial);
AsyncWebServerResponse* response = request->beginResponse(500, "application/json", AsyncWebServerResponse* response = request->beginResponse(500, "application/json",
"{\"status\": \"FAIL\"}"); "{\"status\": \"FAIL\", \"message\": \"Update failed: not enough space\"}");
response->addHeader("Connection", "close"); response->addHeader("Connection", "close");
request->send(response); request->send(response);
return; return;
@@ -216,17 +214,17 @@ void NodeService::handleConfigRequest(AsyncWebServerRequest* request) {
// Rebuild self.labels from constructor + config labels // Rebuild self.labels from constructor + config labels
ctx.rebuildLabels(); ctx.rebuildLabels();
// TODO think of a better way to update the member list entry for the local node // Update the member list entry for the local node if it exists
// Update the member list entry for this node if it exists String localIPStr = ctx.localIP.toString();
if (ctx.memberList) { auto member = ctx.memberList->getMember(localIPStr.c_str());
auto it = ctx.memberList->find(ctx.hostname); if (member) {
if (it != ctx.memberList->end()) {
// Update the labels in the member list entry // Update the labels in the member list entry
it->second.labels.clear(); NodeInfo updatedNode = *member;
updatedNode.labels.clear();
for (const auto& kv : ctx.self.labels) { for (const auto& kv : ctx.self.labels) {
it->second.labels[kv.first] = kv.second; updatedNode.labels[kv.first] = kv.second;
}
} }
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
} }
// Save config to file // Save config to file
@@ -255,12 +253,9 @@ void NodeService::handleGetConfigRequest(AsyncWebServerRequest* request) {
// Cluster Configuration // Cluster Configuration
JsonObject clusterObj = doc["cluster"].to<JsonObject>(); JsonObject clusterObj = doc["cluster"].to<JsonObject>();
clusterObj["discovery_interval_ms"] = ctx.config.discovery_interval_ms;
clusterObj["heartbeat_interval_ms"] = ctx.config.heartbeat_interval_ms; clusterObj["heartbeat_interval_ms"] = ctx.config.heartbeat_interval_ms;
clusterObj["cluster_listen_interval_ms"] = ctx.config.cluster_listen_interval_ms; clusterObj["cluster_listen_interval_ms"] = ctx.config.cluster_listen_interval_ms;
clusterObj["status_update_interval_ms"] = ctx.config.status_update_interval_ms; clusterObj["status_update_interval_ms"] = ctx.config.status_update_interval_ms;
clusterObj["member_info_update_interval_ms"] = ctx.config.member_info_update_interval_ms;
clusterObj["print_interval_ms"] = ctx.config.print_interval_ms;
// Node Status Thresholds // Node Status Thresholds
JsonObject thresholdsObj = doc["thresholds"].to<JsonObject>(); JsonObject thresholdsObj = doc["thresholds"].to<JsonObject>();

View File

@@ -32,12 +32,9 @@ void Config::setDefaults() {
api_server_port = DEFAULT_API_SERVER_PORT; api_server_port = DEFAULT_API_SERVER_PORT;
// Cluster Configuration // Cluster Configuration
discovery_interval_ms = DEFAULT_DISCOVERY_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms
cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS;
heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS; heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS;
status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS; status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS;
member_info_update_interval_ms = DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms
print_interval_ms = DEFAULT_PRINT_INTERVAL_MS;
// Node Status Thresholds // Node Status Thresholds
node_active_threshold_ms = DEFAULT_NODE_ACTIVE_THRESHOLD_MS; node_active_threshold_ms = DEFAULT_NODE_ACTIVE_THRESHOLD_MS;
@@ -86,12 +83,9 @@ bool Config::saveToFile(const String& filename) {
doc["network"]["api_server_port"] = api_server_port; doc["network"]["api_server_port"] = api_server_port;
// Cluster Configuration // Cluster Configuration
doc["cluster"]["discovery_interval_ms"] = discovery_interval_ms;
doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms; doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms;
doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms; doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms;
doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms; doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms;
doc["cluster"]["member_info_update_interval_ms"] = member_info_update_interval_ms;
doc["cluster"]["print_interval_ms"] = print_interval_ms;
// Node Status Thresholds // Node Status Thresholds
doc["thresholds"]["node_active_threshold_ms"] = node_active_threshold_ms; doc["thresholds"]["node_active_threshold_ms"] = node_active_threshold_ms;
@@ -166,12 +160,9 @@ bool Config::loadFromFile(const String& filename) {
api_server_port = doc["network"]["api_server_port"] | DEFAULT_API_SERVER_PORT; api_server_port = doc["network"]["api_server_port"] | DEFAULT_API_SERVER_PORT;
// Load Cluster Configuration with defaults // Load Cluster Configuration with defaults
discovery_interval_ms = doc["cluster"]["discovery_interval_ms"] | DEFAULT_DISCOVERY_INTERVAL_MS;
heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS; heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS;
cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS; cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS;
status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS; status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS;
member_info_update_interval_ms = doc["cluster"]["member_info_update_interval_ms"] | DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS;
print_interval_ms = doc["cluster"]["print_interval_ms"] | DEFAULT_PRINT_INTERVAL_MS;
// Load Node Status Thresholds with defaults // Load Node Status Thresholds with defaults
node_active_threshold_ms = doc["thresholds"]["node_active_threshold_ms"] | DEFAULT_NODE_ACTIVE_THRESHOLD_MS; node_active_threshold_ms = doc["thresholds"]["node_active_threshold_ms"] | DEFAULT_NODE_ACTIVE_THRESHOLD_MS;

View File

@@ -1,185 +0,0 @@
#include "spore/util/CpuUsage.h"
CpuUsage::CpuUsage()
: _initialized(false)
, _measuring(false)
, _measurementCount(0)
, _cycleStartTime(0)
, _idleStartTime(0)
, _totalIdleTime(0)
, _totalCycleTime(0)
, _currentCpuUsage(0.0f)
, _averageCpuUsage(0.0f)
, _maxCpuUsage(0.0f)
, _minCpuUsage(100.0f)
, _totalCpuTime(0)
, _rollingIndex(0)
, _rollingWindowFull(false) {
// Initialize rolling window
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
_rollingWindow[i] = 0.0f;
}
}
void CpuUsage::begin() {
if (_initialized) {
return;
}
_initialized = true;
_measurementCount = 0;
_totalIdleTime = 0;
_totalCycleTime = 0;
_totalCpuTime = 0;
_currentCpuUsage = 0.0f;
_averageCpuUsage = 0.0f;
_maxCpuUsage = 0.0f;
_minCpuUsage = 100.0f;
_rollingIndex = 0;
_rollingWindowFull = false;
// Initialize rolling window
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
_rollingWindow[i] = 0.0f;
}
}
void CpuUsage::startMeasurement() {
if (!_initialized) {
return;
}
if (_measuring) {
// If already measuring, end the previous measurement first
endMeasurement();
}
_measuring = true;
_cycleStartTime = millis();
_idleStartTime = millis();
}
void CpuUsage::endMeasurement() {
if (!_initialized || !_measuring) {
return;
}
unsigned long cycleEndTime = millis();
unsigned long cycleDuration = cycleEndTime - _cycleStartTime;
// Calculate idle time (time spent in yield() calls)
unsigned long idleTime = cycleEndTime - _idleStartTime;
// Calculate CPU usage
if (cycleDuration > 0) {
_currentCpuUsage = ((float)(cycleDuration - idleTime) / (float)cycleDuration) * 100.0f;
// Clamp to valid range
if (_currentCpuUsage < 0.0f) {
_currentCpuUsage = 0.0f;
} else if (_currentCpuUsage > 100.0f) {
_currentCpuUsage = 100.0f;
}
// Update statistics
_totalCycleTime += cycleDuration;
_totalIdleTime += idleTime;
_totalCpuTime += (cycleDuration - idleTime);
_measurementCount++;
// Update rolling average
updateRollingAverage(_currentCpuUsage);
// Update min/max
updateMinMax(_currentCpuUsage);
// Calculate overall average
if (_measurementCount > 0) {
_averageCpuUsage = ((float)_totalCpuTime / (float)_totalCycleTime) * 100.0f;
}
}
_measuring = false;
}
float CpuUsage::getCpuUsage() const {
return _currentCpuUsage;
}
float CpuUsage::getAverageCpuUsage() const {
if (_rollingWindowFull) {
return _averageCpuUsage;
} else if (_measurementCount > 0) {
// Calculate average from rolling window
float sum = 0.0f;
for (size_t i = 0; i < _rollingIndex; ++i) {
sum += _rollingWindow[i];
}
return sum / (float)_rollingIndex;
}
return 0.0f;
}
float CpuUsage::getMaxCpuUsage() const {
return _maxCpuUsage;
}
float CpuUsage::getMinCpuUsage() const {
return _minCpuUsage;
}
void CpuUsage::reset() {
_measurementCount = 0;
_totalIdleTime = 0;
_totalCycleTime = 0;
_totalCpuTime = 0;
_currentCpuUsage = 0.0f;
_averageCpuUsage = 0.0f;
_maxCpuUsage = 0.0f;
_minCpuUsage = 100.0f;
_rollingIndex = 0;
_rollingWindowFull = false;
// Reset rolling window
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
_rollingWindow[i] = 0.0f;
}
}
bool CpuUsage::isMeasuring() const {
return _measuring;
}
unsigned long CpuUsage::getMeasurementCount() const {
return _measurementCount;
}
void CpuUsage::updateRollingAverage(float value) {
_rollingWindow[_rollingIndex] = value;
_rollingIndex++;
if (_rollingIndex >= ROLLING_WINDOW_SIZE) {
_rollingIndex = 0;
_rollingWindowFull = true;
}
// Calculate rolling average
float sum = 0.0f;
size_t count = _rollingWindowFull ? ROLLING_WINDOW_SIZE : _rollingIndex;
for (size_t i = 0; i < count; ++i) {
sum += _rollingWindow[i];
}
_averageCpuUsage = sum / (float)count;
}
void CpuUsage::updateMinMax(float value) {
if (value > _maxCpuUsage) {
_maxCpuUsage = value;
}
if (value < _minCpuUsage) {
_minCpuUsage = value;
}
}