Compare commits
11 Commits
7bd3e87271
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| e60093c419 | |||
| 633957c95c | |||
| ad879bfe7b | |||
| 4559e13d7d | |||
| 682849650d | |||
| 0f003335b3 | |||
| eab10cffa5 | |||
| 7f40626187 | |||
| e796375a9f | |||
| daae29dd3f | |||
| 37a68e26d8 |
@@ -195,13 +195,13 @@ void NeoPatternService::registerEventHandlers() {
|
|||||||
JsonDocument doc;
|
JsonDocument doc;
|
||||||
DeserializationError err = deserializeJson(doc, *jsonStr);
|
DeserializationError err = deserializeJson(doc, *jsonStr);
|
||||||
if (err) {
|
if (err) {
|
||||||
LOG_WARN("NeoPattern", String("Failed to parse CLUSTER_EVENT data: ") + err.c_str());
|
LOG_WARN("NeoPattern", String("Failed to parse cluster/event data: ") + err.c_str());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
JsonObject obj = doc.as<JsonObject>();
|
JsonObject obj = doc.as<JsonObject>();
|
||||||
bool applied = applyControlParams(obj);
|
bool applied = applyControlParams(obj);
|
||||||
if (applied) {
|
if (applied) {
|
||||||
LOG_INFO("NeoPattern", "Applied control from CLUSTER_EVENT");
|
LOG_INFO("NeoPattern", "Applied control from cluster/event");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
202
examples/pixelstream/PixelStreamService.cpp
Normal file
202
examples/pixelstream/PixelStreamService.cpp
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
#include "PixelStreamService.h"
|
||||||
|
#include "spore/util/Logging.h"
|
||||||
|
#include <Adafruit_NeoPixel.h>
|
||||||
|
|
||||||
|
PixelStreamService::PixelStreamService(NodeContext& ctx, ApiServer& apiServer, PixelStreamController* controller)
|
||||||
|
: ctx(ctx), apiServer(apiServer), controller(controller) {}
|
||||||
|
|
||||||
|
void PixelStreamService::registerEndpoints(ApiServer& api) {
|
||||||
|
// Config endpoint for setting pixelstream configuration
|
||||||
|
api.registerEndpoint("/api/pixelstream/config", HTTP_PUT,
|
||||||
|
[this](AsyncWebServerRequest* request) { handleConfigRequest(request); },
|
||||||
|
std::vector<ParamSpec>{
|
||||||
|
ParamSpec{String("pin"), false, String("body"), String("number"), {}, String("")},
|
||||||
|
ParamSpec{String("pixel_count"), false, String("body"), String("number"), {}, String("")},
|
||||||
|
ParamSpec{String("brightness"), false, String("body"), String("number"), {}, String("")},
|
||||||
|
ParamSpec{String("matrix_width"), false, String("body"), String("number"), {}, String("")},
|
||||||
|
ParamSpec{String("matrix_serpentine"), false, String("body"), String("boolean"), {}, String("")},
|
||||||
|
ParamSpec{String("pixel_type"), false, String("body"), String("number"), {}, String("")}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Config endpoint for getting pixelstream configuration
|
||||||
|
api.registerEndpoint("/api/pixelstream/config", HTTP_GET,
|
||||||
|
[this](AsyncWebServerRequest* request) { handleGetConfigRequest(request); },
|
||||||
|
std::vector<ParamSpec>{});
|
||||||
|
}
|
||||||
|
|
||||||
|
void PixelStreamService::registerTasks(TaskManager& taskManager) {
|
||||||
|
// PixelStreamService doesn't register any tasks itself
|
||||||
|
}
|
||||||
|
|
||||||
|
PixelStreamConfig PixelStreamService::loadConfig() {
|
||||||
|
// Initialize with proper defaults
|
||||||
|
PixelStreamConfig config;
|
||||||
|
config.pin = 2;
|
||||||
|
config.pixelCount = 16;
|
||||||
|
config.brightness = 80;
|
||||||
|
config.matrixWidth = 16;
|
||||||
|
config.matrixSerpentine = false;
|
||||||
|
config.pixelType = NEO_GRB + NEO_KHZ800;
|
||||||
|
|
||||||
|
if (!LittleFS.begin()) {
|
||||||
|
LOG_WARN("PixelStream", "Failed to initialize LittleFS, using defaults");
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!LittleFS.exists(CONFIG_FILE())) {
|
||||||
|
LOG_INFO("PixelStream", "No pixelstream config file found, using defaults");
|
||||||
|
// Save defaults
|
||||||
|
saveConfig(config);
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
File file = LittleFS.open(CONFIG_FILE(), "r");
|
||||||
|
if (!file) {
|
||||||
|
LOG_ERROR("PixelStream", "Failed to open config file for reading");
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
JsonDocument doc;
|
||||||
|
DeserializationError error = deserializeJson(doc, file);
|
||||||
|
file.close();
|
||||||
|
|
||||||
|
if (error) {
|
||||||
|
LOG_ERROR("PixelStream", "Failed to parse config file: " + String(error.c_str()));
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (doc["pin"].is<uint8_t>()) config.pin = doc["pin"].as<uint8_t>();
|
||||||
|
if (doc["pixel_count"].is<uint16_t>()) config.pixelCount = doc["pixel_count"].as<uint16_t>();
|
||||||
|
if (doc["brightness"].is<uint8_t>()) config.brightness = doc["brightness"].as<uint8_t>();
|
||||||
|
if (doc["matrix_width"].is<uint16_t>()) config.matrixWidth = doc["matrix_width"].as<uint16_t>();
|
||||||
|
if (doc["matrix_serpentine"].is<bool>()) config.matrixSerpentine = doc["matrix_serpentine"].as<bool>();
|
||||||
|
if (doc["pixel_type"].is<uint8_t>()) config.pixelType = static_cast<neoPixelType>(doc["pixel_type"].as<uint8_t>());
|
||||||
|
|
||||||
|
LOG_INFO("PixelStream", "Configuration loaded from " + String(CONFIG_FILE()));
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PixelStreamService::saveConfig(const PixelStreamConfig& config) {
|
||||||
|
if (!LittleFS.begin()) {
|
||||||
|
LOG_ERROR("PixelStream", "LittleFS not initialized, cannot save config");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
File file = LittleFS.open(CONFIG_FILE(), "w");
|
||||||
|
if (!file) {
|
||||||
|
LOG_ERROR("PixelStream", "Failed to open config file for writing");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
JsonDocument doc;
|
||||||
|
doc["pin"] = config.pin;
|
||||||
|
doc["pixel_count"] = config.pixelCount;
|
||||||
|
doc["brightness"] = config.brightness;
|
||||||
|
doc["matrix_width"] = config.matrixWidth;
|
||||||
|
doc["matrix_serpentine"] = config.matrixSerpentine;
|
||||||
|
doc["pixel_type"] = static_cast<uint8_t>(config.pixelType);
|
||||||
|
|
||||||
|
size_t bytesWritten = serializeJson(doc, file);
|
||||||
|
file.close();
|
||||||
|
|
||||||
|
if (bytesWritten > 0) {
|
||||||
|
LOG_INFO("PixelStream", "Configuration saved to " + String(CONFIG_FILE()) + " (" + String(bytesWritten) + " bytes)");
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
LOG_ERROR("PixelStream", "Failed to write configuration to file");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void PixelStreamService::handleConfigRequest(AsyncWebServerRequest* request) {
|
||||||
|
// Load current config from file
|
||||||
|
PixelStreamConfig config = loadConfig();
|
||||||
|
|
||||||
|
bool updated = false;
|
||||||
|
|
||||||
|
// Handle individual form parameters
|
||||||
|
if (request->hasParam("pin", true)) {
|
||||||
|
String pinStr = request->getParam("pin", true)->value();
|
||||||
|
if (pinStr.length() > 0) {
|
||||||
|
int pinValue = pinStr.toInt();
|
||||||
|
if (pinValue >= 0 && pinValue <= 255) {
|
||||||
|
config.pin = static_cast<uint8_t>(pinValue);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (request->hasParam("pixel_count", true)) {
|
||||||
|
String countStr = request->getParam("pixel_count", true)->value();
|
||||||
|
if (countStr.length() > 0) {
|
||||||
|
int countValue = countStr.toInt();
|
||||||
|
if (countValue > 0 && countValue <= 65535) {
|
||||||
|
config.pixelCount = static_cast<uint16_t>(countValue);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (request->hasParam("brightness", true)) {
|
||||||
|
String brightnessStr = request->getParam("brightness", true)->value();
|
||||||
|
if (brightnessStr.length() > 0) {
|
||||||
|
int brightnessValue = brightnessStr.toInt();
|
||||||
|
if (brightnessValue >= 0 && brightnessValue <= 255) {
|
||||||
|
config.brightness = static_cast<uint8_t>(brightnessValue);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (request->hasParam("matrix_width", true)) {
|
||||||
|
String widthStr = request->getParam("matrix_width", true)->value();
|
||||||
|
if (widthStr.length() > 0) {
|
||||||
|
int widthValue = widthStr.toInt();
|
||||||
|
if (widthValue > 0 && widthValue <= 65535) {
|
||||||
|
config.matrixWidth = static_cast<uint16_t>(widthValue);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (request->hasParam("matrix_serpentine", true)) {
|
||||||
|
String serpentineStr = request->getParam("matrix_serpentine", true)->value();
|
||||||
|
config.matrixSerpentine = (serpentineStr.equalsIgnoreCase("true") || serpentineStr == "1");
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
if (request->hasParam("pixel_type", true)) {
|
||||||
|
String typeStr = request->getParam("pixel_type", true)->value();
|
||||||
|
if (typeStr.length() > 0) {
|
||||||
|
int typeValue = typeStr.toInt();
|
||||||
|
config.pixelType = static_cast<neoPixelType>(typeValue);
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!updated) {
|
||||||
|
request->send(400, "application/json", "{\"error\":\"No valid configuration fields provided\"}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save config to file
|
||||||
|
if (saveConfig(config)) {
|
||||||
|
LOG_INFO("PixelStreamService", "Configuration updated and saved to pixelstream.json");
|
||||||
|
request->send(200, "application/json", "{\"status\":\"success\",\"message\":\"Configuration updated and saved\"}");
|
||||||
|
} else {
|
||||||
|
LOG_ERROR("PixelStreamService", "Failed to save configuration to file");
|
||||||
|
request->send(500, "application/json", "{\"error\":\"Failed to save configuration\"}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void PixelStreamService::handleGetConfigRequest(AsyncWebServerRequest* request) {
|
||||||
|
PixelStreamConfig config = loadConfig();
|
||||||
|
|
||||||
|
JsonDocument doc;
|
||||||
|
doc["pin"] = config.pin;
|
||||||
|
doc["pixel_count"] = config.pixelCount;
|
||||||
|
doc["brightness"] = config.brightness;
|
||||||
|
doc["matrix_width"] = config.matrixWidth;
|
||||||
|
doc["matrix_serpentine"] = config.matrixSerpentine;
|
||||||
|
doc["pixel_type"] = static_cast<uint8_t>(config.pixelType);
|
||||||
|
|
||||||
|
String json;
|
||||||
|
serializeJson(doc, json);
|
||||||
|
request->send(200, "application/json", json);
|
||||||
|
}
|
||||||
|
|
||||||
33
examples/pixelstream/PixelStreamService.h
Normal file
33
examples/pixelstream/PixelStreamService.h
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
#pragma once
|
||||||
|
#include "spore/Service.h"
|
||||||
|
#include "spore/core/NodeContext.h"
|
||||||
|
#include "PixelStreamController.h"
|
||||||
|
#include <ArduinoJson.h>
|
||||||
|
#include <LittleFS.h>
|
||||||
|
#include "spore/util/Logging.h"
|
||||||
|
|
||||||
|
// PixelStreamConfig is defined in PixelStreamController.h
|
||||||
|
|
||||||
|
class PixelStreamService : public Service {
|
||||||
|
public:
|
||||||
|
PixelStreamService(NodeContext& ctx, ApiServer& apiServer, PixelStreamController* controller);
|
||||||
|
void registerEndpoints(ApiServer& api) override;
|
||||||
|
void registerTasks(TaskManager& taskManager) override;
|
||||||
|
const char* getName() const override { return "PixelStream"; }
|
||||||
|
|
||||||
|
// Config management
|
||||||
|
PixelStreamConfig loadConfig();
|
||||||
|
bool saveConfig(const PixelStreamConfig& config);
|
||||||
|
void setController(PixelStreamController* ctrl) { controller = ctrl; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
NodeContext& ctx;
|
||||||
|
ApiServer& apiServer;
|
||||||
|
PixelStreamController* controller;
|
||||||
|
|
||||||
|
void handleConfigRequest(AsyncWebServerRequest* request);
|
||||||
|
void handleGetConfigRequest(AsyncWebServerRequest* request);
|
||||||
|
|
||||||
|
static const char* CONFIG_FILE() { return "/pixelstream.json"; }
|
||||||
|
};
|
||||||
|
|
||||||
@@ -2,7 +2,11 @@
|
|||||||
#include "spore/Spore.h"
|
#include "spore/Spore.h"
|
||||||
#include "spore/util/Logging.h"
|
#include "spore/util/Logging.h"
|
||||||
#include "PixelStreamController.h"
|
#include "PixelStreamController.h"
|
||||||
|
#include "PixelStreamService.h"
|
||||||
|
#include <Adafruit_NeoPixel.h>
|
||||||
|
|
||||||
|
// Defaults are now loaded from config.json on LittleFS
|
||||||
|
// Can still be overridden with preprocessor defines if needed
|
||||||
#ifndef PIXEL_PIN
|
#ifndef PIXEL_PIN
|
||||||
#define PIXEL_PIN 2
|
#define PIXEL_PIN 2
|
||||||
#endif
|
#endif
|
||||||
@@ -34,22 +38,28 @@ Spore spore({
|
|||||||
});
|
});
|
||||||
|
|
||||||
PixelStreamController* controller = nullptr;
|
PixelStreamController* controller = nullptr;
|
||||||
|
PixelStreamService* service = nullptr;
|
||||||
|
|
||||||
void setup() {
|
void setup() {
|
||||||
spore.setup();
|
spore.setup();
|
||||||
|
|
||||||
PixelStreamConfig config{
|
// Create service first (need it to load config)
|
||||||
static_cast<uint8_t>(PIXEL_PIN),
|
service = new PixelStreamService(spore.getContext(), spore.getApiServer(), nullptr);
|
||||||
static_cast<uint16_t>(PIXEL_COUNT),
|
|
||||||
static_cast<uint8_t>(PIXEL_BRIGHTNESS),
|
|
||||||
static_cast<uint16_t>(PIXEL_MATRIX_WIDTH),
|
|
||||||
static_cast<bool>(PIXEL_MATRIX_SERPENTINE),
|
|
||||||
static_cast<neoPixelType>(PIXEL_TYPE)
|
|
||||||
};
|
|
||||||
|
|
||||||
|
// Load pixelstream config from LittleFS (pixelstream.json) or use defaults
|
||||||
|
PixelStreamConfig config = service->loadConfig();
|
||||||
|
|
||||||
|
// Create controller with loaded config
|
||||||
controller = new PixelStreamController(spore.getContext(), config);
|
controller = new PixelStreamController(spore.getContext(), config);
|
||||||
controller->begin();
|
controller->begin();
|
||||||
|
|
||||||
|
// Update service with the actual controller
|
||||||
|
service->setController(controller);
|
||||||
|
|
||||||
|
// Register service
|
||||||
|
spore.registerService(service);
|
||||||
|
|
||||||
|
// Start the API server
|
||||||
spore.begin();
|
spore.begin();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,7 +11,6 @@
|
|||||||
#include "core/TaskManager.h"
|
#include "core/TaskManager.h"
|
||||||
#include "Service.h"
|
#include "Service.h"
|
||||||
#include "util/Logging.h"
|
#include "util/Logging.h"
|
||||||
#include "util/CpuUsage.h"
|
|
||||||
|
|
||||||
class Spore {
|
class Spore {
|
||||||
public:
|
public:
|
||||||
@@ -35,12 +34,6 @@ public:
|
|||||||
ClusterManager& getCluster() { return cluster; }
|
ClusterManager& getCluster() { return cluster; }
|
||||||
ApiServer& getApiServer() { return apiServer; }
|
ApiServer& getApiServer() { return apiServer; }
|
||||||
|
|
||||||
// CPU usage monitoring
|
|
||||||
CpuUsage& getCpuUsage() { return cpuUsage; }
|
|
||||||
float getCurrentCpuUsage() const { return cpuUsage.getCpuUsage(); }
|
|
||||||
float getAverageCpuUsage() const { return cpuUsage.getAverageCpuUsage(); }
|
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void initializeCore();
|
void initializeCore();
|
||||||
void registerCoreServices();
|
void registerCoreServices();
|
||||||
@@ -51,7 +44,6 @@ private:
|
|||||||
TaskManager taskManager;
|
TaskManager taskManager;
|
||||||
ClusterManager cluster;
|
ClusterManager cluster;
|
||||||
ApiServer apiServer;
|
ApiServer apiServer;
|
||||||
CpuUsage cpuUsage;
|
|
||||||
|
|
||||||
std::vector<std::shared_ptr<Service>> services;
|
std::vector<std::shared_ptr<Service>> services;
|
||||||
bool initialized;
|
bool initialized;
|
||||||
|
|||||||
@@ -19,9 +19,8 @@ public:
|
|||||||
void updateAllNodeStatuses();
|
void updateAllNodeStatuses();
|
||||||
void removeDeadNodes();
|
void removeDeadNodes();
|
||||||
void printMemberList();
|
void printMemberList();
|
||||||
const std::map<String, NodeInfo>& getMemberList() const { return *ctx.memberList; }
|
size_t getMemberCount() const { return ctx.memberList->getMemberCount(); }
|
||||||
void fetchNodeInfo(const IPAddress& ip);
|
void updateLocalNodeResources(NodeInfo& node);
|
||||||
void updateLocalNodeResources();
|
|
||||||
void heartbeatTaskCallback();
|
void heartbeatTaskCallback();
|
||||||
void updateAllMembersInfoTaskCallback();
|
void updateAllMembersInfoTaskCallback();
|
||||||
void broadcastNodeUpdate();
|
void broadcastNodeUpdate();
|
||||||
|
|||||||
133
include/spore/core/Memberlist.h
Normal file
133
include/spore/core/Memberlist.h
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Arduino.h>
|
||||||
|
#include <map>
|
||||||
|
#include <string>
|
||||||
|
#include <optional>
|
||||||
|
#include <functional>
|
||||||
|
#include "spore/types/NodeInfo.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Manages the list of cluster members.
|
||||||
|
*
|
||||||
|
* The Memberlist class maintains a collection of cluster members, where each member
|
||||||
|
* is identified by its IP address and associated with a NodeInfo object. It provides
|
||||||
|
* methods to add, update, and remove members, as well as handle node status changes
|
||||||
|
* (stale and dead nodes).
|
||||||
|
*/
|
||||||
|
class Memberlist {
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* @brief Default constructor.
|
||||||
|
*/
|
||||||
|
Memberlist();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Destructor.
|
||||||
|
*/
|
||||||
|
~Memberlist();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Adds or updates a member in the list.
|
||||||
|
*
|
||||||
|
* If the member already exists, updates its information. Otherwise, adds a new member.
|
||||||
|
* @param ip The IP address of the member (as string).
|
||||||
|
* @param node The NodeInfo object containing member details.
|
||||||
|
* @return True if the member was added or updated, false otherwise.
|
||||||
|
*/
|
||||||
|
bool addOrUpdateMember(const std::string& ip, const NodeInfo& node);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Adds a new member to the list.
|
||||||
|
*
|
||||||
|
* @param ip The IP address of the member (as string).
|
||||||
|
* @param node The NodeInfo object containing member details.
|
||||||
|
* @return True if the member was added, false if it already exists.
|
||||||
|
*/
|
||||||
|
bool addMember(const std::string& ip, const NodeInfo& node);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Updates an existing member in the list.
|
||||||
|
*
|
||||||
|
* @param ip The IP address of the member (as string).
|
||||||
|
* @param node The updated NodeInfo object.
|
||||||
|
* @return True if the member was updated, false if it doesn't exist.
|
||||||
|
*/
|
||||||
|
bool updateMember(const std::string& ip, const NodeInfo& node);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Removes a member from the list.
|
||||||
|
*
|
||||||
|
* @param ip The IP address of the member to remove (as string).
|
||||||
|
* @return True if the member was removed, false if it doesn't exist.
|
||||||
|
*/
|
||||||
|
bool removeMember(const std::string& ip);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Retrieves a member by IP address.
|
||||||
|
*
|
||||||
|
* @param ip The IP address of the member (as string).
|
||||||
|
* @return Optional containing the NodeInfo if found, or std::nullopt if not found.
|
||||||
|
*/
|
||||||
|
std::optional<NodeInfo> getMember(const std::string& ip) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Iterates over all members and calls the provided callback for each.
|
||||||
|
*
|
||||||
|
* @param callback Function to call for each member. Receives (ip, node) as parameters.
|
||||||
|
*/
|
||||||
|
void forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Iterates over all members and calls the provided callback for each.
|
||||||
|
*
|
||||||
|
* @param callback Function to call for each member. Receives (ip, node) as parameters.
|
||||||
|
* If callback returns false, iteration stops.
|
||||||
|
* @return True if all members were processed, false if iteration was stopped early.
|
||||||
|
*/
|
||||||
|
bool forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Gets the number of members in the list.
|
||||||
|
*
|
||||||
|
* @return The number of members.
|
||||||
|
*/
|
||||||
|
size_t getMemberCount() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Updates the status of all members based on current time and thresholds.
|
||||||
|
*
|
||||||
|
* Marks nodes as stale or dead based on their last seen time.
|
||||||
|
* @param currentTime The current time in milliseconds.
|
||||||
|
* @param staleThresholdMs Threshold for marking a node as stale (milliseconds).
|
||||||
|
* @param deadThresholdMs Threshold for marking a node as dead (milliseconds).
|
||||||
|
* @param onStatusChange Optional callback fired when a node's status changes.
|
||||||
|
*/
|
||||||
|
void updateAllNodeStatuses(unsigned long currentTime,
|
||||||
|
unsigned long staleThresholdMs,
|
||||||
|
unsigned long deadThresholdMs,
|
||||||
|
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange = nullptr);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Removes all dead members from the list.
|
||||||
|
*
|
||||||
|
* @return The number of members removed.
|
||||||
|
*/
|
||||||
|
size_t removeDeadMembers();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Checks if a member exists in the list.
|
||||||
|
*
|
||||||
|
* @param ip The IP address of the member (as string).
|
||||||
|
* @return True if the member exists, false otherwise.
|
||||||
|
*/
|
||||||
|
bool hasMember(const std::string& ip) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Clears all members from the list.
|
||||||
|
*/
|
||||||
|
void clear();
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::map<std::string, NodeInfo> m_members; ///< Internal map holding the members.
|
||||||
|
};
|
||||||
@@ -2,12 +2,14 @@
|
|||||||
|
|
||||||
#include <WiFiUdp.h>
|
#include <WiFiUdp.h>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include "spore/types/NodeInfo.h"
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <initializer_list>
|
#include <initializer_list>
|
||||||
|
#include <memory>
|
||||||
|
#include "spore/types/NodeInfo.h"
|
||||||
#include "spore/types/Config.h"
|
#include "spore/types/Config.h"
|
||||||
#include "spore/types/ApiTypes.h"
|
#include "spore/types/ApiTypes.h"
|
||||||
|
#include "spore/core/Memberlist.h"
|
||||||
|
|
||||||
class NodeContext {
|
class NodeContext {
|
||||||
public:
|
public:
|
||||||
@@ -18,7 +20,7 @@ public:
|
|||||||
String hostname;
|
String hostname;
|
||||||
IPAddress localIP;
|
IPAddress localIP;
|
||||||
NodeInfo self;
|
NodeInfo self;
|
||||||
std::map<String, NodeInfo>* memberList;
|
std::unique_ptr<Memberlist> memberList;
|
||||||
::Config config;
|
::Config config;
|
||||||
std::map<String, String> constructorLabels; // Labels passed to constructor (not persisted)
|
std::map<String, String> constructorLabels; // Labels passed to constructor (not persisted)
|
||||||
|
|
||||||
|
|||||||
@@ -5,10 +5,9 @@
|
|||||||
|
|
||||||
// Cluster protocol and API constants
|
// Cluster protocol and API constants
|
||||||
namespace ClusterProtocol {
|
namespace ClusterProtocol {
|
||||||
// Simplified heartbeat-only protocol
|
constexpr const char* HEARTBEAT_MSG = "cluster/heartbeat";
|
||||||
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT";
|
constexpr const char* NODE_UPDATE_MSG = "node/update";
|
||||||
constexpr const char* NODE_UPDATE_MSG = "NODE_UPDATE";
|
constexpr const char* CLUSTER_EVENT_MSG = "cluster/event";
|
||||||
constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT";
|
|
||||||
constexpr const char* RAW_MSG = "RAW";
|
constexpr const char* RAW_MSG = "RAW";
|
||||||
constexpr uint16_t UDP_PORT = 4210;
|
constexpr uint16_t UDP_PORT = 4210;
|
||||||
// Increased buffer to accommodate larger RAW pixel streams and node info JSON over UDP
|
// Increased buffer to accommodate larger RAW pixel streams and node info JSON over UDP
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
#include "spore/Service.h"
|
#include "spore/Service.h"
|
||||||
#include "spore/util/CpuUsage.h"
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
||||||
class MonitoringService : public Service {
|
class MonitoringService : public Service {
|
||||||
public:
|
public:
|
||||||
MonitoringService(CpuUsage& cpuUsage);
|
MonitoringService();
|
||||||
void registerEndpoints(ApiServer& api) override;
|
void registerEndpoints(ApiServer& api) override;
|
||||||
void registerTasks(TaskManager& taskManager) override;
|
void registerTasks(TaskManager& taskManager) override;
|
||||||
const char* getName() const override { return "Monitoring"; }
|
const char* getName() const override { return "Monitoring"; }
|
||||||
@@ -15,17 +14,12 @@ public:
|
|||||||
// CPU information
|
// CPU information
|
||||||
float currentCpuUsage;
|
float currentCpuUsage;
|
||||||
float averageCpuUsage;
|
float averageCpuUsage;
|
||||||
float maxCpuUsage;
|
|
||||||
float minCpuUsage;
|
|
||||||
unsigned long measurementCount;
|
unsigned long measurementCount;
|
||||||
bool isMeasuring;
|
bool isMeasuring;
|
||||||
|
|
||||||
// Memory information
|
// Memory information
|
||||||
size_t freeHeap;
|
size_t freeHeap;
|
||||||
size_t totalHeap;
|
size_t totalHeap;
|
||||||
size_t minFreeHeap;
|
|
||||||
size_t maxAllocHeap;
|
|
||||||
size_t heapFragmentation;
|
|
||||||
|
|
||||||
// Filesystem information
|
// Filesystem information
|
||||||
size_t totalBytes;
|
size_t totalBytes;
|
||||||
@@ -45,8 +39,5 @@ private:
|
|||||||
void handleResourcesRequest(AsyncWebServerRequest* request);
|
void handleResourcesRequest(AsyncWebServerRequest* request);
|
||||||
|
|
||||||
// Helper methods
|
// Helper methods
|
||||||
size_t calculateHeapFragmentation() const;
|
|
||||||
void getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const;
|
void getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const;
|
||||||
|
|
||||||
CpuUsage& cpuUsage;
|
|
||||||
};
|
};
|
||||||
@@ -1,118 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <Arduino.h>
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief CPU usage measurement utility for ESP32/ESP8266
|
|
||||||
*
|
|
||||||
* This class provides methods to measure CPU usage by tracking idle time
|
|
||||||
* and calculating the percentage of time the CPU is busy vs idle.
|
|
||||||
*/
|
|
||||||
class CpuUsage {
|
|
||||||
public:
|
|
||||||
/**
|
|
||||||
* @brief Construct a new CpuUsage object
|
|
||||||
*/
|
|
||||||
CpuUsage();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Destructor
|
|
||||||
*/
|
|
||||||
~CpuUsage() = default;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Initialize the CPU usage measurement
|
|
||||||
* Call this once during setup
|
|
||||||
*/
|
|
||||||
void begin();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Start measuring CPU usage for the current cycle
|
|
||||||
* Call this at the beginning of your main loop
|
|
||||||
*/
|
|
||||||
void startMeasurement();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief End measuring CPU usage for the current cycle
|
|
||||||
* Call this at the end of your main loop
|
|
||||||
*/
|
|
||||||
void endMeasurement();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Get the current CPU usage percentage
|
|
||||||
* @return float CPU usage percentage (0.0 to 100.0)
|
|
||||||
*/
|
|
||||||
float getCpuUsage() const;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Get the average CPU usage over the measurement window
|
|
||||||
* @return float Average CPU usage percentage (0.0 to 100.0)
|
|
||||||
*/
|
|
||||||
float getAverageCpuUsage() const;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Get the maximum CPU usage recorded
|
|
||||||
* @return float Maximum CPU usage percentage (0.0 to 100.0)
|
|
||||||
*/
|
|
||||||
float getMaxCpuUsage() const;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Get the minimum CPU usage recorded
|
|
||||||
* @return float Minimum CPU usage percentage (0.0 to 100.0)
|
|
||||||
*/
|
|
||||||
float getMinCpuUsage() const;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Reset all CPU usage statistics
|
|
||||||
*/
|
|
||||||
void reset();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Check if measurement is currently active
|
|
||||||
* @return true if measurement is active, false otherwise
|
|
||||||
*/
|
|
||||||
bool isMeasuring() const;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Get the number of measurements taken
|
|
||||||
* @return unsigned long Number of measurements
|
|
||||||
*/
|
|
||||||
unsigned long getMeasurementCount() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
// Measurement state
|
|
||||||
bool _initialized;
|
|
||||||
bool _measuring;
|
|
||||||
unsigned long _measurementCount;
|
|
||||||
|
|
||||||
// Timing variables
|
|
||||||
unsigned long _cycleStartTime;
|
|
||||||
unsigned long _idleStartTime;
|
|
||||||
unsigned long _totalIdleTime;
|
|
||||||
unsigned long _totalCycleTime;
|
|
||||||
|
|
||||||
// Statistics
|
|
||||||
float _currentCpuUsage;
|
|
||||||
float _averageCpuUsage;
|
|
||||||
float _maxCpuUsage;
|
|
||||||
float _minCpuUsage;
|
|
||||||
unsigned long _totalCpuTime;
|
|
||||||
|
|
||||||
// Rolling average window
|
|
||||||
static constexpr size_t ROLLING_WINDOW_SIZE = 10;
|
|
||||||
float _rollingWindow[ROLLING_WINDOW_SIZE];
|
|
||||||
size_t _rollingIndex;
|
|
||||||
bool _rollingWindowFull;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Update rolling average calculation
|
|
||||||
* @param value New value to add to rolling average
|
|
||||||
*/
|
|
||||||
void updateRollingAverage(float value);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Update min/max statistics
|
|
||||||
* @param value New value to check against min/max
|
|
||||||
*/
|
|
||||||
void updateMinMax(float value);
|
|
||||||
};
|
|
||||||
@@ -18,6 +18,15 @@ monitor_speed = 115200
|
|||||||
lib_deps =
|
lib_deps =
|
||||||
esp32async/ESPAsyncWebServer@^3.8.0
|
esp32async/ESPAsyncWebServer@^3.8.0
|
||||||
bblanchon/ArduinoJson@^7.4.2
|
bblanchon/ArduinoJson@^7.4.2
|
||||||
|
build_flags =
|
||||||
|
-Os ; Optimize for size
|
||||||
|
-ffunction-sections ; Place each function in its own section
|
||||||
|
-fdata-sections ; Place data in separate sections
|
||||||
|
-Wl,--gc-sections ; Remove unused sections at link time
|
||||||
|
-DNDEBUG ; Disable debug assertions
|
||||||
|
-DVTABLES_IN_FLASH ; Move virtual tables to flash
|
||||||
|
-fno-exceptions ; Disable C++ exceptions
|
||||||
|
-fno-rtti ; Disable runtime type information
|
||||||
|
|
||||||
[env:base]
|
[env:base]
|
||||||
platform = platformio/espressif8266@^4.2.1
|
platform = platformio/espressif8266@^4.2.1
|
||||||
@@ -31,6 +40,7 @@ board_build.filesystem = littlefs
|
|||||||
; note: somehow partition table is not working, so we need to use the ldscript
|
; note: somehow partition table is not working, so we need to use the ldscript
|
||||||
board_build.ldscript = eagle.flash.1m64.ld ; 64KB -> FS Size
|
board_build.ldscript = eagle.flash.1m64.ld ; 64KB -> FS Size
|
||||||
lib_deps = ${common.lib_deps}
|
lib_deps = ${common.lib_deps}
|
||||||
|
build_flags = ${common.build_flags}
|
||||||
build_src_filter =
|
build_src_filter =
|
||||||
+<examples/base/*.cpp>
|
+<examples/base/*.cpp>
|
||||||
+<src/spore/*.cpp>
|
+<src/spore/*.cpp>
|
||||||
@@ -51,6 +61,7 @@ board_build.flash_mode = dio ; D1 Mini uses DIO on 4 Mbit flash
|
|||||||
board_build.flash_size = 4M
|
board_build.flash_size = 4M
|
||||||
board_build.ldscript = eagle.flash.4m1m.ld
|
board_build.ldscript = eagle.flash.4m1m.ld
|
||||||
lib_deps = ${common.lib_deps}
|
lib_deps = ${common.lib_deps}
|
||||||
|
build_flags = ${common.build_flags}
|
||||||
build_src_filter =
|
build_src_filter =
|
||||||
+<examples/base/*.cpp>
|
+<examples/base/*.cpp>
|
||||||
+<src/spore/*.cpp>
|
+<src/spore/*.cpp>
|
||||||
@@ -71,6 +82,7 @@ board_build.flash_mode = dout
|
|||||||
board_build.ldscript = eagle.flash.1m64.ld
|
board_build.ldscript = eagle.flash.1m64.ld
|
||||||
lib_deps = ${common.lib_deps}
|
lib_deps = ${common.lib_deps}
|
||||||
;data_dir = examples/relay/data
|
;data_dir = examples/relay/data
|
||||||
|
build_flags = ${common.build_flags}
|
||||||
build_src_filter =
|
build_src_filter =
|
||||||
+<examples/relay/*.cpp>
|
+<examples/relay/*.cpp>
|
||||||
+<src/spore/*.cpp>
|
+<src/spore/*.cpp>
|
||||||
@@ -91,7 +103,7 @@ board_build.flash_mode = dout
|
|||||||
board_build.ldscript = eagle.flash.1m64.ld
|
board_build.ldscript = eagle.flash.1m64.ld
|
||||||
lib_deps = ${common.lib_deps}
|
lib_deps = ${common.lib_deps}
|
||||||
adafruit/Adafruit NeoPixel@^1.15.1
|
adafruit/Adafruit NeoPixel@^1.15.1
|
||||||
build_flags = -DLED_STRIP_PIN=2
|
build_flags = -DLED_STRIP_PIN=2 ;${common.build_flags}
|
||||||
build_src_filter =
|
build_src_filter =
|
||||||
+<examples/neopattern/*.cpp>
|
+<examples/neopattern/*.cpp>
|
||||||
+<src/spore/*.cpp>
|
+<src/spore/*.cpp>
|
||||||
@@ -112,7 +124,7 @@ board_build.flash_mode = dout
|
|||||||
board_build.ldscript = eagle.flash.1m64.ld
|
board_build.ldscript = eagle.flash.1m64.ld
|
||||||
lib_deps = ${common.lib_deps}
|
lib_deps = ${common.lib_deps}
|
||||||
adafruit/Adafruit NeoPixel@^1.15.1
|
adafruit/Adafruit NeoPixel@^1.15.1
|
||||||
build_flags =
|
build_flags = ${common.build_flags}
|
||||||
build_src_filter =
|
build_src_filter =
|
||||||
+<examples/pixelstream/*.cpp>
|
+<examples/pixelstream/*.cpp>
|
||||||
+<src/spore/*.cpp>
|
+<src/spore/*.cpp>
|
||||||
@@ -133,7 +145,7 @@ board_build.flash_mode = dout
|
|||||||
board_build.ldscript = eagle.flash.4m1m.ld
|
board_build.ldscript = eagle.flash.4m1m.ld
|
||||||
lib_deps = ${common.lib_deps}
|
lib_deps = ${common.lib_deps}
|
||||||
adafruit/Adafruit NeoPixel@^1.15.1
|
adafruit/Adafruit NeoPixel@^1.15.1
|
||||||
build_flags = -DPIXEL_PIN=TX -DPIXEL_COUNT=256 -DMATRIX_WIDTH=16
|
build_flags = -DPIXEL_PIN=TX -DPIXEL_COUNT=256 -DMATRIX_WIDTH=16 ${common.build_flags}
|
||||||
build_src_filter =
|
build_src_filter =
|
||||||
+<examples/pixelstream/*.cpp>
|
+<examples/pixelstream/*.cpp>
|
||||||
+<src/spore/*.cpp>
|
+<src/spore/*.cpp>
|
||||||
@@ -156,6 +168,7 @@ board_build.ldscript = eagle.flash.4m1m.ld
|
|||||||
lib_deps = ${common.lib_deps}
|
lib_deps = ${common.lib_deps}
|
||||||
adafruit/Adafruit NeoPixel@^1.15.1
|
adafruit/Adafruit NeoPixel@^1.15.1
|
||||||
dfrobot/DFRobotDFPlayerMini@^1.0.6
|
dfrobot/DFRobotDFPlayerMini@^1.0.6
|
||||||
|
build_flags = ${common.build_flags}
|
||||||
build_src_filter =
|
build_src_filter =
|
||||||
+<examples/multimatrix/*.cpp>
|
+<examples/multimatrix/*.cpp>
|
||||||
+<examples/pixelstream/PixelStreamController.cpp>
|
+<examples/pixelstream/PixelStreamController.cpp>
|
||||||
|
|||||||
@@ -9,7 +9,7 @@
|
|||||||
|
|
||||||
Spore::Spore() : ctx(), network(ctx), taskManager(ctx), cluster(ctx, taskManager),
|
Spore::Spore() : ctx(), network(ctx), taskManager(ctx), cluster(ctx, taskManager),
|
||||||
apiServer(ctx, taskManager, ctx.config.api_server_port),
|
apiServer(ctx, taskManager, ctx.config.api_server_port),
|
||||||
cpuUsage(), initialized(false), apiServerStarted(false) {
|
initialized(false), apiServerStarted(false) {
|
||||||
|
|
||||||
// Rebuild labels from constructor + config labels
|
// Rebuild labels from constructor + config labels
|
||||||
ctx.rebuildLabels();
|
ctx.rebuildLabels();
|
||||||
@@ -18,7 +18,7 @@ Spore::Spore() : ctx(), network(ctx), taskManager(ctx), cluster(ctx, taskManager
|
|||||||
Spore::Spore(std::initializer_list<std::pair<String, String>> initialLabels)
|
Spore::Spore(std::initializer_list<std::pair<String, String>> initialLabels)
|
||||||
: ctx(initialLabels), network(ctx), taskManager(ctx), cluster(ctx, taskManager),
|
: ctx(initialLabels), network(ctx), taskManager(ctx), cluster(ctx, taskManager),
|
||||||
apiServer(ctx, taskManager, ctx.config.api_server_port),
|
apiServer(ctx, taskManager, ctx.config.api_server_port),
|
||||||
cpuUsage(), initialized(false), apiServerStarted(false) {
|
initialized(false), apiServerStarted(false) {
|
||||||
|
|
||||||
// Rebuild labels from constructor + config labels (config takes precedence)
|
// Rebuild labels from constructor + config labels (config takes precedence)
|
||||||
ctx.rebuildLabels();
|
ctx.rebuildLabels();
|
||||||
@@ -40,9 +40,6 @@ void Spore::setup() {
|
|||||||
// Initialize core components
|
// Initialize core components
|
||||||
initializeCore();
|
initializeCore();
|
||||||
|
|
||||||
// Initialize CPU usage monitoring
|
|
||||||
cpuUsage.begin();
|
|
||||||
|
|
||||||
// Register core services
|
// Register core services
|
||||||
registerCoreServices();
|
registerCoreServices();
|
||||||
|
|
||||||
@@ -78,15 +75,9 @@ void Spore::loop() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start CPU usage measurement
|
|
||||||
cpuUsage.startMeasurement();
|
|
||||||
|
|
||||||
// Execute main tasks
|
// Execute main tasks
|
||||||
taskManager.execute();
|
taskManager.execute();
|
||||||
|
|
||||||
// End CPU usage measurement before yield
|
|
||||||
cpuUsage.endMeasurement();
|
|
||||||
|
|
||||||
// Yield to allow other tasks to run
|
// Yield to allow other tasks to run
|
||||||
yield();
|
yield();
|
||||||
}
|
}
|
||||||
@@ -141,7 +132,7 @@ void Spore::registerCoreServices() {
|
|||||||
auto clusterService = std::make_shared<ClusterService>(ctx);
|
auto clusterService = std::make_shared<ClusterService>(ctx);
|
||||||
auto taskService = std::make_shared<TaskService>(taskManager);
|
auto taskService = std::make_shared<TaskService>(taskManager);
|
||||||
auto staticFileService = std::make_shared<StaticFileService>(ctx, apiServer);
|
auto staticFileService = std::make_shared<StaticFileService>(ctx, apiServer);
|
||||||
auto monitoringService = std::make_shared<MonitoringService>(cpuUsage);
|
auto monitoringService = std::make_shared<MonitoringService>();
|
||||||
|
|
||||||
// Add to services list
|
// Add to services list
|
||||||
services.push_back(nodeService);
|
services.push_back(nodeService);
|
||||||
|
|||||||
@@ -23,11 +23,12 @@ void ApiServer::registerEndpoint(const String& uri, int method,
|
|||||||
endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
|
endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
|
||||||
|
|
||||||
// Update cluster if needed
|
// Update cluster if needed
|
||||||
if (ctx.memberList && !ctx.memberList->empty()) {
|
String localIPStr = ctx.localIP.toString();
|
||||||
auto it = ctx.memberList->find(ctx.hostname);
|
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||||
if (it != ctx.memberList->end()) {
|
if (member) {
|
||||||
it->second.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
|
NodeInfo updatedNode = *member;
|
||||||
}
|
updatedNode.endpoints.push_back(EndpointInfo{uri, method, params, serviceName, true});
|
||||||
|
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,15 @@
|
|||||||
#include "spore/core/ClusterManager.h"
|
#include "spore/core/ClusterManager.h"
|
||||||
#include "spore/internal/Globals.h"
|
#include "spore/internal/Globals.h"
|
||||||
#include "spore/util/Logging.h"
|
#include "spore/util/Logging.h"
|
||||||
|
#include "spore/types/NodeInfo.h"
|
||||||
|
|
||||||
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
|
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
|
||||||
// Register callback for node/discovered event
|
// Register callback for node/discovered event - this fires when network is ready
|
||||||
ctx.on("node/discovered", [this](void* data) {
|
ctx.on("node/discovered", [this](void* data) {
|
||||||
NodeInfo* node = static_cast<NodeInfo*>(data);
|
NodeInfo* node = static_cast<NodeInfo*>(data);
|
||||||
this->addOrUpdateNode(node->hostname, node->ip);
|
this->addOrUpdateNode(node->hostname, node->ip);
|
||||||
});
|
});
|
||||||
// Centralized broadcast handler: services fire 'cluster/broadcast' with CLUSTER_EVENT JSON payload
|
// Centralized broadcast handler: services fire 'cluster/broadcast' with cluster/event JSON payload
|
||||||
ctx.on("cluster/broadcast", [this](void* data) {
|
ctx.on("cluster/broadcast", [this](void* data) {
|
||||||
String* jsonStr = static_cast<String*>(data);
|
String* jsonStr = static_cast<String*>(data);
|
||||||
if (!jsonStr) {
|
if (!jsonStr) {
|
||||||
@@ -19,7 +20,7 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
|
|||||||
IPAddress ip = WiFi.localIP();
|
IPAddress ip = WiFi.localIP();
|
||||||
IPAddress mask = WiFi.subnetMask();
|
IPAddress mask = WiFi.subnetMask();
|
||||||
IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]);
|
IPAddress bcast(ip[0] | ~mask[0], ip[1] | ~mask[1], ip[2] | ~mask[2], ip[3] | ~mask[3]);
|
||||||
LOG_DEBUG("Cluster", String("Broadcasting CLUSTER_EVENT to ") + bcast.toString() + " len=" + String(jsonStr->length()));
|
LOG_DEBUG("Cluster", String("Broadcasting cluster/event to ") + bcast.toString() + " len=" + String(jsonStr->length()));
|
||||||
this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port);
|
this->ctx.udp->beginPacket(bcast, this->ctx.config.udp_port);
|
||||||
String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr;
|
String msg = String(ClusterProtocol::CLUSTER_EVENT_MSG) + ":" + *jsonStr;
|
||||||
this->ctx.udp->write(msg.c_str());
|
this->ctx.udp->write(msg.c_str());
|
||||||
@@ -119,7 +120,7 @@ bool ClusterManager::isRawMsg(const char* msg) {
|
|||||||
// Discovery functionality removed - using heartbeat-only approach
|
// Discovery functionality removed - using heartbeat-only approach
|
||||||
|
|
||||||
void ClusterManager::onHeartbeat(const char* msg) {
|
void ClusterManager::onHeartbeat(const char* msg) {
|
||||||
// Extract hostname from heartbeat message: "CLUSTER_HEARTBEAT:hostname"
|
// Extract hostname from heartbeat message: "cluster/heartbeat:hostname"
|
||||||
const char* colon = strchr(msg, ':');
|
const char* colon = strchr(msg, ':');
|
||||||
if (!colon) {
|
if (!colon) {
|
||||||
LOG_WARN("Cluster", "Invalid heartbeat message format");
|
LOG_WARN("Cluster", "Invalid heartbeat message format");
|
||||||
@@ -137,7 +138,7 @@ void ClusterManager::onHeartbeat(const char* msg) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::onNodeUpdate(const char* msg) {
|
void ClusterManager::onNodeUpdate(const char* msg) {
|
||||||
// Message format: "NODE_UPDATE:hostname:{json}"
|
// Message format: "node/update:hostname:{json}"
|
||||||
const char* firstColon = strchr(msg, ':');
|
const char* firstColon = strchr(msg, ':');
|
||||||
if (!firstColon) {
|
if (!firstColon) {
|
||||||
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
|
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
|
||||||
@@ -165,18 +166,15 @@ void ClusterManager::onNodeUpdate(const char* msg) {
|
|||||||
// but is sent FROM the responding node (ctx.udp->remoteIP())
|
// but is sent FROM the responding node (ctx.udp->remoteIP())
|
||||||
// We need to find the responding node in the memberlist, not the target node
|
// We need to find the responding node in the memberlist, not the target node
|
||||||
IPAddress respondingNodeIP = ctx.udp->remoteIP();
|
IPAddress respondingNodeIP = ctx.udp->remoteIP();
|
||||||
auto& memberList = *ctx.memberList;
|
String respondingIPStr = respondingNodeIP.toString();
|
||||||
|
|
||||||
// Find the responding node by IP address
|
// Find the responding node by IP address
|
||||||
NodeInfo* respondingNode = nullptr;
|
auto respondingMember = ctx.memberList->getMember(respondingIPStr.c_str());
|
||||||
for (auto& pair : memberList) {
|
if (!respondingMember) {
|
||||||
if (pair.second.ip == respondingNodeIP) {
|
LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString());
|
||||||
respondingNode = &pair.second;
|
return;
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (respondingNode) {
|
|
||||||
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
|
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
|
||||||
unsigned long latency = 0;
|
unsigned long latency = 0;
|
||||||
unsigned long now = millis();
|
unsigned long now = millis();
|
||||||
@@ -185,24 +183,27 @@ void ClusterManager::onNodeUpdate(const char* msg) {
|
|||||||
lastHeartbeatSentAt = 0; // Reset for next calculation
|
lastHeartbeatSentAt = 0; // Reset for next calculation
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the responding node's information
|
// Create updated node info
|
||||||
|
NodeInfo updatedNode = *respondingMember;
|
||||||
bool hostnameChanged = false;
|
bool hostnameChanged = false;
|
||||||
|
bool labelsChanged = false;
|
||||||
|
|
||||||
|
// Update hostname if provided
|
||||||
if (doc["hostname"].is<const char*>()) {
|
if (doc["hostname"].is<const char*>()) {
|
||||||
String newHostname = doc["hostname"].as<const char*>();
|
String newHostname = doc["hostname"].as<const char*>();
|
||||||
if (respondingNode->hostname != newHostname) {
|
if (updatedNode.hostname != newHostname) {
|
||||||
respondingNode->hostname = newHostname;
|
updatedNode.hostname = newHostname;
|
||||||
hostnameChanged = true;
|
hostnameChanged = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update uptime if provided
|
||||||
if (doc["uptime"].is<unsigned long>()) {
|
if (doc["uptime"].is<unsigned long>()) {
|
||||||
respondingNode->uptime = doc["uptime"];
|
updatedNode.uptime = doc["uptime"];
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update labels if provided
|
// Update labels if provided
|
||||||
bool labelsChanged = false;
|
|
||||||
if (doc["labels"].is<JsonObject>()) {
|
if (doc["labels"].is<JsonObject>()) {
|
||||||
// Check if labels actually changed
|
|
||||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||||
std::map<String, String> newLabels;
|
std::map<String, String> newLabels;
|
||||||
for (JsonPair kvp : labelsObj) {
|
for (JsonPair kvp : labelsObj) {
|
||||||
@@ -212,20 +213,24 @@ void ClusterManager::onNodeUpdate(const char* msg) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Compare with existing labels
|
// Compare with existing labels
|
||||||
if (newLabels != respondingNode->labels) {
|
if (newLabels != updatedNode.labels) {
|
||||||
labelsChanged = true;
|
labelsChanged = true;
|
||||||
respondingNode->labels = newLabels;
|
updatedNode.labels = newLabels;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
respondingNode->lastSeen = now;
|
// Update timing and status
|
||||||
respondingNode->status = NodeInfo::ACTIVE;
|
updatedNode.lastSeen = now;
|
||||||
|
updatedNode.status = NodeInfo::ACTIVE;
|
||||||
|
|
||||||
// Update latency if we calculated it (preserve existing value if not)
|
// Update latency if we calculated it (preserve existing value if not)
|
||||||
if (latency > 0) {
|
if (latency > 0) {
|
||||||
respondingNode->latency = latency;
|
updatedNode.latency = latency;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Persist the updated node info to the memberlist
|
||||||
|
ctx.memberList->updateMember(respondingIPStr.c_str(), updatedNode);
|
||||||
|
|
||||||
// Check if any fields changed that require broadcasting
|
// Check if any fields changed that require broadcasting
|
||||||
bool nodeInfoChanged = hostnameChanged || labelsChanged;
|
bool nodeInfoChanged = hostnameChanged || labelsChanged;
|
||||||
|
|
||||||
@@ -234,23 +239,20 @@ void ClusterManager::onNodeUpdate(const char* msg) {
|
|||||||
ctx.fire("cluster/node/update", nullptr);
|
ctx.fire("cluster/node/update", nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() +
|
LOG_DEBUG("Cluster", String("Updated responding node ") + updatedNode.hostname + " @ " + respondingNodeIP.toString() +
|
||||||
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
|
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
|
||||||
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
|
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
|
||||||
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
|
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
|
||||||
} else {
|
|
||||||
LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) {
|
void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) {
|
||||||
JsonDocument doc;
|
JsonDocument doc;
|
||||||
|
|
||||||
// Get our node info for the response (we're the responding node)
|
// Get our node info for the response (we're the responding node)
|
||||||
auto& memberList = *ctx.memberList;
|
String localIPStr = ctx.localIP.toString();
|
||||||
auto it = memberList.find(ctx.hostname);
|
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||||
if (it != memberList.end()) {
|
if (member) {
|
||||||
const NodeInfo& node = it->second;
|
const NodeInfo& node = *member;
|
||||||
|
|
||||||
// Response contains info about ourselves (the responding node)
|
// Response contains info about ourselves (the responding node)
|
||||||
doc["hostname"] = node.hostname;
|
doc["hostname"] = node.hostname;
|
||||||
@@ -265,7 +267,7 @@ void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress&
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Fallback to basic info
|
// Fallback to basic info if not in memberlist
|
||||||
doc["hostname"] = ctx.hostname;
|
doc["hostname"] = ctx.hostname;
|
||||||
doc["ip"] = ctx.localIP.toString();
|
doc["ip"] = ctx.localIP.toString();
|
||||||
doc["uptime"] = millis();
|
doc["uptime"] = millis();
|
||||||
@@ -284,17 +286,17 @@ void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress&
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::onClusterEvent(const char* msg) {
|
void ClusterManager::onClusterEvent(const char* msg) {
|
||||||
// Message format: CLUSTER_EVENT:{"event":"...","data":"<json string>"}
|
// Message format: cluster/event:{"event":"...","data":"<json string>"}
|
||||||
const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':'
|
const char* jsonStart = msg + strlen(ClusterProtocol::CLUSTER_EVENT_MSG) + 1; // skip prefix and ':'
|
||||||
if (*jsonStart == '\0') {
|
if (*jsonStart == '\0') {
|
||||||
LOG_DEBUG("Cluster", "CLUSTER_EVENT received with empty payload");
|
LOG_DEBUG("Cluster", "cluster/event received with empty payload");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG_DEBUG("Cluster", String("CLUSTER_EVENT raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart)));
|
LOG_DEBUG("Cluster", String("cluster/event raw from ") + ctx.udp->remoteIP().toString() + " len=" + String(strlen(jsonStart)));
|
||||||
JsonDocument doc;
|
JsonDocument doc;
|
||||||
DeserializationError err = deserializeJson(doc, jsonStart);
|
DeserializationError err = deserializeJson(doc, jsonStart);
|
||||||
if (err) {
|
if (err) {
|
||||||
LOG_ERROR("Cluster", String("Failed to parse CLUSTER_EVENT JSON from ") + ctx.udp->remoteIP().toString());
|
LOG_ERROR("Cluster", String("Failed to parse cluster/event JSON from ") + ctx.udp->remoteIP().toString());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Robust extraction of event and data
|
// Robust extraction of event and data
|
||||||
@@ -318,7 +320,7 @@ void ClusterManager::onClusterEvent(const char* msg) {
|
|||||||
if (eventStr.length() == 0 || data.length() == 0) {
|
if (eventStr.length() == 0 || data.length() == 0) {
|
||||||
String dbg;
|
String dbg;
|
||||||
serializeJson(doc, dbg);
|
serializeJson(doc, dbg);
|
||||||
LOG_WARN("Cluster", String("CLUSTER_EVENT missing 'event' or 'data' | payload=") + dbg);
|
LOG_WARN("Cluster", String("cluster/event missing 'event' or 'data' | payload=") + dbg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -345,20 +347,20 @@ void ClusterManager::onRawMessage(const char* msg) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||||
auto& memberList = *ctx.memberList;
|
|
||||||
bool memberlistChanged = false;
|
bool memberlistChanged = false;
|
||||||
|
String ipStr = nodeIP.toString();
|
||||||
|
|
||||||
// O(1) lookup instead of O(n) search
|
// Check if member exists
|
||||||
auto it = memberList.find(nodeHost);
|
auto existingMember = ctx.memberList->getMember(ipStr.c_str());
|
||||||
if (it != memberList.end()) {
|
if (existingMember) {
|
||||||
// Update existing node - preserve all existing field values
|
// Update existing node - preserve all existing field values
|
||||||
if (it->second.ip != nodeIP) {
|
NodeInfo updatedNode = *existingMember;
|
||||||
it->second.ip = nodeIP;
|
if (updatedNode.ip != nodeIP) {
|
||||||
|
updatedNode.ip = nodeIP;
|
||||||
memberlistChanged = true;
|
memberlistChanged = true;
|
||||||
}
|
}
|
||||||
it->second.lastSeen = millis();
|
updatedNode.lastSeen = millis();
|
||||||
// Note: Other fields like latency, uptime, labels, etc. are preserved
|
ctx.memberList->updateMember(ipStr.c_str(), updatedNode);
|
||||||
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
|
||||||
} else {
|
} else {
|
||||||
// Add new node
|
// Add new node
|
||||||
NodeInfo newNode;
|
NodeInfo newNode;
|
||||||
@@ -366,10 +368,19 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
|||||||
newNode.ip = nodeIP;
|
newNode.ip = nodeIP;
|
||||||
newNode.lastSeen = millis();
|
newNode.lastSeen = millis();
|
||||||
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
||||||
memberList[nodeHost] = newNode;
|
|
||||||
|
// Initialize static resources if this is the local node being added for the first time
|
||||||
|
if (nodeIP == ctx.localIP && nodeHost == ctx.hostname) {
|
||||||
|
newNode.resources.chipId = ESP.getChipId();
|
||||||
|
newNode.resources.sdkVersion = String(ESP.getSdkVersion());
|
||||||
|
newNode.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
|
||||||
|
newNode.resources.flashChipSize = ESP.getFlashChipSize();
|
||||||
|
LOG_DEBUG("Cluster", "Initialized static resources for local node");
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.memberList->addMember(ipStr.c_str(), newNode);
|
||||||
memberlistChanged = true;
|
memberlistChanged = true;
|
||||||
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
|
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
|
||||||
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fire event if memberlist changed
|
// Fire event if memberlist changed
|
||||||
@@ -378,124 +389,15 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// unused http client to fetch complete node info
|
|
||||||
void ClusterManager::fetchNodeInfo(const IPAddress& ip) {
|
|
||||||
if(ip == ctx.localIP) {
|
|
||||||
LOG_DEBUG("Cluster", "Skipping fetch for local node");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
unsigned long requestStart = millis();
|
|
||||||
HTTPClient http;
|
|
||||||
WiFiClient client;
|
|
||||||
String url = "http://" + ip.toString() + ClusterProtocol::API_NODE_STATUS;
|
|
||||||
|
|
||||||
// Use RAII pattern to ensure http.end() is always called
|
|
||||||
bool httpInitialized = false;
|
|
||||||
bool success = false;
|
|
||||||
|
|
||||||
httpInitialized = http.begin(client, url);
|
|
||||||
if (!httpInitialized) {
|
|
||||||
LOG_ERROR("Cluster", "Failed to initialize HTTP client for " + ip.toString());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set timeout to prevent hanging
|
|
||||||
http.setTimeout(5000); // 5 second timeout
|
|
||||||
|
|
||||||
int httpCode = http.GET();
|
|
||||||
unsigned long requestEnd = millis();
|
|
||||||
unsigned long requestDuration = requestEnd - requestStart;
|
|
||||||
|
|
||||||
if (httpCode == 200) {
|
|
||||||
String payload = http.getString();
|
|
||||||
|
|
||||||
// Use stack-allocated JsonDocument with proper cleanup
|
|
||||||
JsonDocument doc;
|
|
||||||
DeserializationError err = deserializeJson(doc, payload);
|
|
||||||
|
|
||||||
if (!err) {
|
|
||||||
auto& memberList = *ctx.memberList;
|
|
||||||
// Still need to iterate since we're searching by IP, not hostname
|
|
||||||
for (auto& pair : memberList) {
|
|
||||||
NodeInfo& node = pair.second;
|
|
||||||
if (node.ip == ip) {
|
|
||||||
// Update resources efficiently
|
|
||||||
node.resources.freeHeap = doc["freeHeap"];
|
|
||||||
node.resources.chipId = doc["chipId"];
|
|
||||||
node.resources.sdkVersion = (const char*)doc["sdkVersion"];
|
|
||||||
node.resources.cpuFreqMHz = doc["cpuFreqMHz"];
|
|
||||||
node.resources.flashChipSize = doc["flashChipSize"];
|
|
||||||
node.status = NodeInfo::ACTIVE;
|
|
||||||
node.latency = requestDuration;
|
|
||||||
node.lastSeen = millis();
|
|
||||||
|
|
||||||
// Clear and rebuild endpoints efficiently
|
|
||||||
node.endpoints.clear();
|
|
||||||
node.endpoints.reserve(10); // Pre-allocate to avoid reallocations
|
|
||||||
|
|
||||||
if (doc["api"].is<JsonArray>()) {
|
|
||||||
JsonArray apiArr = doc["api"].as<JsonArray>();
|
|
||||||
for (JsonObject apiObj : apiArr) {
|
|
||||||
// Use const char* to avoid String copies
|
|
||||||
const char* uri = apiObj["uri"];
|
|
||||||
int method = apiObj["method"];
|
|
||||||
|
|
||||||
// Create basic EndpointInfo without params for cluster nodes
|
|
||||||
EndpointInfo endpoint;
|
|
||||||
endpoint.uri = uri; // String assignment is more efficient than construction
|
|
||||||
endpoint.method = method;
|
|
||||||
endpoint.isLocal = false;
|
|
||||||
endpoint.serviceName = "remote";
|
|
||||||
node.endpoints.push_back(std::move(endpoint));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse labels efficiently
|
|
||||||
node.labels.clear();
|
|
||||||
if (doc["labels"].is<JsonObject>()) {
|
|
||||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
|
||||||
for (JsonPair kvp : labelsObj) {
|
|
||||||
// Use const char* to avoid String copies
|
|
||||||
const char* key = kvp.key().c_str();
|
|
||||||
const char* value = labelsObj[kvp.key()];
|
|
||||||
node.labels[key] = value;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_DEBUG("Cluster", "Fetched info for node: " + node.hostname + " @ " + ip.toString());
|
|
||||||
success = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG_ERROR("Cluster", "JSON parse error for node @ " + ip.toString() + ": " + String(err.c_str()));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG_ERROR("Cluster", "Failed to fetch info for node @ " + ip.toString() + ", HTTP code: " + String(httpCode));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Always ensure HTTP client is properly closed
|
|
||||||
if (httpInitialized) {
|
|
||||||
http.end();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log success/failure for debugging
|
|
||||||
if (!success) {
|
|
||||||
LOG_DEBUG("Cluster", "Failed to update node info for " + ip.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ClusterManager::heartbeatTaskCallback() {
|
void ClusterManager::heartbeatTaskCallback() {
|
||||||
auto& memberList = *ctx.memberList;
|
// Update local node resources and lastSeen since we're actively sending heartbeats
|
||||||
auto it = memberList.find(ctx.hostname);
|
String localIPStr = ctx.localIP.toString();
|
||||||
if (it != memberList.end()) {
|
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||||
NodeInfo& node = it->second;
|
if (member) {
|
||||||
node.lastSeen = millis();
|
NodeInfo node = *member;
|
||||||
node.status = NodeInfo::ACTIVE;
|
updateLocalNodeResources(node);
|
||||||
node.uptime = millis(); // Update uptime
|
node.lastSeen = millis(); // Update lastSeen since we're actively participating
|
||||||
updateLocalNodeResources();
|
ctx.memberList->updateMember(localIPStr.c_str(), node);
|
||||||
addOrUpdateNode(ctx.hostname, ctx.localIP);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast heartbeat - peers will respond with NODE_UPDATE
|
// Broadcast heartbeat - peers will respond with NODE_UPDATE
|
||||||
@@ -515,13 +417,13 @@ void ClusterManager::updateAllMembersInfoTaskCallback() {
|
|||||||
|
|
||||||
void ClusterManager::broadcastNodeUpdate() {
|
void ClusterManager::broadcastNodeUpdate() {
|
||||||
// Broadcast our current node info as NODE_UPDATE to all cluster members
|
// Broadcast our current node info as NODE_UPDATE to all cluster members
|
||||||
auto& memberList = *ctx.memberList;
|
String localIPStr = ctx.localIP.toString();
|
||||||
auto it = memberList.find(ctx.hostname);
|
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||||
if (it == memberList.end()) {
|
if (!member) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const NodeInfo& node = it->second;
|
const NodeInfo& node = *member;
|
||||||
|
|
||||||
JsonDocument doc;
|
JsonDocument doc;
|
||||||
doc["hostname"] = node.hostname;
|
doc["hostname"] = node.hostname;
|
||||||
@@ -548,61 +450,39 @@ void ClusterManager::broadcastNodeUpdate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::updateAllNodeStatuses() {
|
void ClusterManager::updateAllNodeStatuses() {
|
||||||
auto& memberList = *ctx.memberList;
|
|
||||||
unsigned long now = millis();
|
unsigned long now = millis();
|
||||||
for (auto& pair : memberList) {
|
ctx.memberList->updateAllNodeStatuses(now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
||||||
NodeInfo& node = pair.second;
|
|
||||||
updateNodeStatus(node, now, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::removeDeadNodes() {
|
void ClusterManager::removeDeadNodes() {
|
||||||
auto& memberList = *ctx.memberList;
|
size_t removedCount = ctx.memberList->removeDeadMembers();
|
||||||
unsigned long now = millis();
|
if (removedCount > 0) {
|
||||||
bool memberlistChanged = false;
|
LOG_INFO("Cluster", String("Removed ") + removedCount + " dead nodes");
|
||||||
|
|
||||||
// Use iterator to safely remove elements from map
|
|
||||||
for (auto it = memberList.begin(); it != memberList.end(); ) {
|
|
||||||
unsigned long diff = now - it->second.lastSeen;
|
|
||||||
if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) {
|
|
||||||
LOG_INFO("Cluster", "Removing node: " + it->second.hostname);
|
|
||||||
it = memberList.erase(it);
|
|
||||||
memberlistChanged = true;
|
|
||||||
} else {
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fire event if memberlist changed
|
|
||||||
if (memberlistChanged) {
|
|
||||||
ctx.fire("cluster/memberlist/changed", nullptr);
|
ctx.fire("cluster/memberlist/changed", nullptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::printMemberList() {
|
void ClusterManager::printMemberList() {
|
||||||
auto& memberList = *ctx.memberList;
|
size_t count = ctx.memberList->getMemberCount();
|
||||||
if (memberList.empty()) {
|
if (count == 0) {
|
||||||
LOG_INFO("Cluster", "Member List: empty");
|
LOG_INFO("Cluster", "Member List: empty");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG_INFO("Cluster", "Member List:");
|
LOG_INFO("Cluster", "Member List:");
|
||||||
for (const auto& pair : memberList) {
|
ctx.memberList->forEachMember([](const std::string& ip, const NodeInfo& node) {
|
||||||
const NodeInfo& node = pair.second;
|
|
||||||
LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen));
|
LOG_INFO("Cluster", " " + node.hostname + " @ " + node.ip.toString() + " | Status: " + statusToStr(node.status) + " | last seen: " + String(millis() - node.lastSeen));
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::updateLocalNodeResources() {
|
void ClusterManager::updateLocalNodeResources(NodeInfo& node) {
|
||||||
auto& memberList = *ctx.memberList;
|
// Update node status and timing
|
||||||
auto it = memberList.find(ctx.hostname);
|
node.lastSeen = millis();
|
||||||
if (it != memberList.end()) {
|
node.status = NodeInfo::ACTIVE;
|
||||||
NodeInfo& node = it->second;
|
node.uptime = millis();
|
||||||
|
|
||||||
|
// Update dynamic resources (always updated)
|
||||||
uint32_t freeHeap = ESP.getFreeHeap();
|
uint32_t freeHeap = ESP.getFreeHeap();
|
||||||
node.resources.freeHeap = freeHeap;
|
node.resources.freeHeap = freeHeap;
|
||||||
node.resources.chipId = ESP.getChipId();
|
|
||||||
node.resources.sdkVersion = String(ESP.getSdkVersion());
|
|
||||||
node.resources.cpuFreqMHz = ESP.getCpuFreqMHz();
|
|
||||||
node.resources.flashChipSize = ESP.getFlashChipSize();
|
|
||||||
|
|
||||||
// Log memory warnings if heap is getting low
|
// Log memory warnings if heap is getting low
|
||||||
if (freeHeap < ctx.config.low_memory_threshold_bytes) {
|
if (freeHeap < ctx.config.low_memory_threshold_bytes) {
|
||||||
@@ -610,5 +490,4 @@ void ClusterManager::updateLocalNodeResources() {
|
|||||||
} else if (freeHeap < ctx.config.critical_memory_threshold_bytes) {
|
} else if (freeHeap < ctx.config.critical_memory_threshold_bytes) {
|
||||||
LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free");
|
LOG_ERROR("Cluster", "Critical memory warning: " + String(freeHeap) + " bytes free");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
114
src/spore/core/Memberlist.cpp
Normal file
114
src/spore/core/Memberlist.cpp
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
#include "spore/core/Memberlist.h"
|
||||||
|
#include <algorithm>
|
||||||
|
|
||||||
|
Memberlist::Memberlist() = default;
|
||||||
|
|
||||||
|
Memberlist::~Memberlist() = default;
|
||||||
|
|
||||||
|
bool Memberlist::addOrUpdateMember(const std::string& ip, const NodeInfo& node) {
|
||||||
|
auto it = m_members.find(ip);
|
||||||
|
if (it != m_members.end()) {
|
||||||
|
// Update existing member
|
||||||
|
it->second = node;
|
||||||
|
it->second.lastSeen = millis(); // Update last seen time
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
// Add new member
|
||||||
|
NodeInfo newNode = node;
|
||||||
|
newNode.lastSeen = millis();
|
||||||
|
m_members[ip] = newNode;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Memberlist::addMember(const std::string& ip, const NodeInfo& node) {
|
||||||
|
if (m_members.find(ip) != m_members.end()) {
|
||||||
|
return false; // Member already exists
|
||||||
|
}
|
||||||
|
NodeInfo newNode = node;
|
||||||
|
newNode.lastSeen = millis();
|
||||||
|
m_members[ip] = newNode;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Memberlist::updateMember(const std::string& ip, const NodeInfo& node) {
|
||||||
|
auto it = m_members.find(ip);
|
||||||
|
if (it == m_members.end()) {
|
||||||
|
return false; // Member doesn't exist
|
||||||
|
}
|
||||||
|
it->second = node;
|
||||||
|
it->second.lastSeen = millis(); // Update last seen time
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Memberlist::removeMember(const std::string& ip) {
|
||||||
|
auto it = m_members.find(ip);
|
||||||
|
if (it == m_members.end()) {
|
||||||
|
return false; // Member doesn't exist
|
||||||
|
}
|
||||||
|
m_members.erase(it);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<NodeInfo> Memberlist::getMember(const std::string& ip) const {
|
||||||
|
auto it = m_members.find(ip);
|
||||||
|
if (it != m_members.end()) {
|
||||||
|
return it->second;
|
||||||
|
}
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Memberlist::forEachMember(std::function<void(const std::string&, const NodeInfo&)> callback) const {
|
||||||
|
for (const auto& pair : m_members) {
|
||||||
|
callback(pair.first, pair.second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Memberlist::forEachMemberUntil(std::function<bool(const std::string&, const NodeInfo&)> callback) const {
|
||||||
|
for (const auto& pair : m_members) {
|
||||||
|
if (!callback(pair.first, pair.second)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t Memberlist::getMemberCount() const {
|
||||||
|
return m_members.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Memberlist::updateAllNodeStatuses(unsigned long currentTime,
|
||||||
|
unsigned long staleThresholdMs,
|
||||||
|
unsigned long deadThresholdMs,
|
||||||
|
std::function<void(const std::string&, NodeInfo::Status, NodeInfo::Status)> onStatusChange) {
|
||||||
|
for (auto& [ip, node] : m_members) {
|
||||||
|
NodeInfo::Status oldStatus = node.status;
|
||||||
|
updateNodeStatus(node, currentTime, staleThresholdMs, deadThresholdMs);
|
||||||
|
|
||||||
|
if (oldStatus != node.status && onStatusChange) {
|
||||||
|
onStatusChange(ip, oldStatus, node.status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t Memberlist::removeDeadMembers() {
|
||||||
|
size_t removedCount = 0;
|
||||||
|
auto it = m_members.begin();
|
||||||
|
while (it != m_members.end()) {
|
||||||
|
if (it->second.status == NodeInfo::Status::DEAD) {
|
||||||
|
it = m_members.erase(it);
|
||||||
|
++removedCount;
|
||||||
|
} else {
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return removedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Memberlist::hasMember(const std::string& ip) const {
|
||||||
|
return m_members.find(ip) != m_members.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Memberlist::clear() {
|
||||||
|
m_members.clear();
|
||||||
|
}
|
||||||
@@ -119,13 +119,8 @@ void NetworkManager::setupWiFi() {
|
|||||||
ctx.self.status = NodeInfo::ACTIVE;
|
ctx.self.status = NodeInfo::ACTIVE;
|
||||||
|
|
||||||
// Ensure member list has an entry for this node
|
// Ensure member list has an entry for this node
|
||||||
auto &memberList = *ctx.memberList;
|
String localIPStr = ctx.localIP.toString();
|
||||||
auto existing = memberList.find(ctx.hostname);
|
ctx.memberList->addOrUpdateMember(localIPStr.c_str(), ctx.self);
|
||||||
if (existing == memberList.end()) {
|
|
||||||
memberList[ctx.hostname] = ctx.self;
|
|
||||||
} else {
|
|
||||||
existing->second = ctx.self;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notify listeners that the node is (re)discovered
|
// Notify listeners that the node is (re)discovered
|
||||||
ctx.fire("node/discovered", &ctx.self);
|
ctx.fire("node/discovered", &ctx.self);
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
NodeContext::NodeContext() {
|
NodeContext::NodeContext() {
|
||||||
udp = new WiFiUDP();
|
udp = new WiFiUDP();
|
||||||
memberList = new std::map<String, NodeInfo>();
|
memberList = std::make_unique<Memberlist>();
|
||||||
hostname = "";
|
hostname = "";
|
||||||
self.hostname = "";
|
self.hostname = "";
|
||||||
self.ip = IPAddress();
|
self.ip = IPAddress();
|
||||||
@@ -19,7 +19,7 @@ NodeContext::NodeContext(std::initializer_list<std::pair<String, String>> initia
|
|||||||
|
|
||||||
NodeContext::~NodeContext() {
|
NodeContext::~NodeContext() {
|
||||||
delete udp;
|
delete udp;
|
||||||
delete memberList;
|
// memberList is a unique_ptr, so no need to delete manually
|
||||||
}
|
}
|
||||||
|
|
||||||
void NodeContext::on(const std::string& event, EventCallback cb) {
|
void NodeContext::on(const std::string& event, EventCallback cb) {
|
||||||
|
|||||||
@@ -40,8 +40,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
|
|||||||
JsonDocument doc;
|
JsonDocument doc;
|
||||||
JsonArray arr = doc["members"].to<JsonArray>();
|
JsonArray arr = doc["members"].to<JsonArray>();
|
||||||
|
|
||||||
for (const auto& pair : *ctx.memberList) {
|
ctx.memberList->forEachMember([&arr](const std::string& ip, const NodeInfo& node) {
|
||||||
const NodeInfo& node = pair.second;
|
|
||||||
JsonObject obj = arr.add<JsonObject>();
|
JsonObject obj = arr.add<JsonObject>();
|
||||||
obj["hostname"] = node.hostname;
|
obj["hostname"] = node.hostname;
|
||||||
obj["ip"] = node.ip.toString();
|
obj["ip"] = node.ip.toString();
|
||||||
@@ -56,7 +55,7 @@ void ClusterService::handleMembersRequest(AsyncWebServerRequest* request) {
|
|||||||
labelsObj[kv.first.c_str()] = kv.second;
|
labelsObj[kv.first.c_str()] = kv.second;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
String json;
|
String json;
|
||||||
serializeJson(doc, json);
|
serializeJson(doc, json);
|
||||||
|
|||||||
@@ -5,8 +5,7 @@
|
|||||||
#include <FS.h>
|
#include <FS.h>
|
||||||
#include <LittleFS.h>
|
#include <LittleFS.h>
|
||||||
|
|
||||||
MonitoringService::MonitoringService(CpuUsage& cpuUsage)
|
MonitoringService::MonitoringService() {
|
||||||
: cpuUsage(cpuUsage) {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MonitoringService::registerEndpoints(ApiServer& api) {
|
void MonitoringService::registerEndpoints(ApiServer& api) {
|
||||||
@@ -22,20 +21,15 @@ void MonitoringService::registerTasks(TaskManager& taskManager) {
|
|||||||
MonitoringService::SystemResources MonitoringService::getSystemResources() const {
|
MonitoringService::SystemResources MonitoringService::getSystemResources() const {
|
||||||
SystemResources resources;
|
SystemResources resources;
|
||||||
|
|
||||||
// CPU information
|
// CPU information - sending fixed value of 100
|
||||||
resources.currentCpuUsage = cpuUsage.getCpuUsage();
|
resources.currentCpuUsage = 100.0f;
|
||||||
resources.averageCpuUsage = cpuUsage.getAverageCpuUsage();
|
resources.averageCpuUsage = 100.0f;
|
||||||
resources.maxCpuUsage = cpuUsage.getMaxCpuUsage();
|
resources.measurementCount = 0;
|
||||||
resources.minCpuUsage = cpuUsage.getMinCpuUsage();
|
resources.isMeasuring = false;
|
||||||
resources.measurementCount = cpuUsage.getMeasurementCount();
|
|
||||||
resources.isMeasuring = cpuUsage.isMeasuring();
|
|
||||||
|
|
||||||
// Memory information - ESP8266 compatible
|
// Memory information - ESP8266 compatible
|
||||||
resources.freeHeap = ESP.getFreeHeap();
|
resources.freeHeap = ESP.getFreeHeap();
|
||||||
resources.totalHeap = 81920; // ESP8266 has ~80KB RAM
|
resources.totalHeap = 81920; // ESP8266 has ~80KB RAM
|
||||||
resources.minFreeHeap = 0; // Not available on ESP8266
|
|
||||||
resources.maxAllocHeap = 0; // Not available on ESP8266
|
|
||||||
resources.heapFragmentation = calculateHeapFragmentation();
|
|
||||||
|
|
||||||
// Filesystem information
|
// Filesystem information
|
||||||
getFilesystemInfo(resources.totalBytes, resources.usedBytes);
|
getFilesystemInfo(resources.totalBytes, resources.usedBytes);
|
||||||
@@ -59,8 +53,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
|
|||||||
JsonObject cpu = doc["cpu"].to<JsonObject>();
|
JsonObject cpu = doc["cpu"].to<JsonObject>();
|
||||||
cpu["current_usage"] = resources.currentCpuUsage;
|
cpu["current_usage"] = resources.currentCpuUsage;
|
||||||
cpu["average_usage"] = resources.averageCpuUsage;
|
cpu["average_usage"] = resources.averageCpuUsage;
|
||||||
cpu["max_usage"] = resources.maxCpuUsage;
|
|
||||||
cpu["min_usage"] = resources.minCpuUsage;
|
|
||||||
cpu["measurement_count"] = resources.measurementCount;
|
cpu["measurement_count"] = resources.measurementCount;
|
||||||
cpu["is_measuring"] = resources.isMeasuring;
|
cpu["is_measuring"] = resources.isMeasuring;
|
||||||
|
|
||||||
@@ -68,9 +60,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
|
|||||||
JsonObject memory = doc["memory"].to<JsonObject>();
|
JsonObject memory = doc["memory"].to<JsonObject>();
|
||||||
memory["free_heap"] = resources.freeHeap;
|
memory["free_heap"] = resources.freeHeap;
|
||||||
memory["total_heap"] = resources.totalHeap;
|
memory["total_heap"] = resources.totalHeap;
|
||||||
memory["min_free_heap"] = resources.minFreeHeap;
|
|
||||||
memory["max_alloc_heap"] = resources.maxAllocHeap;
|
|
||||||
memory["heap_fragmentation"] = resources.heapFragmentation;
|
|
||||||
memory["heap_usage_percent"] = resources.totalHeap > 0 ?
|
memory["heap_usage_percent"] = resources.totalHeap > 0 ?
|
||||||
(float)(resources.totalHeap - resources.freeHeap) / (float)resources.totalHeap * 100.0f : 0.0f;
|
(float)(resources.totalHeap - resources.freeHeap) / (float)resources.totalHeap * 100.0f : 0.0f;
|
||||||
|
|
||||||
@@ -94,15 +83,6 @@ void MonitoringService::handleResourcesRequest(AsyncWebServerRequest* request) {
|
|||||||
request->send(200, "application/json", json);
|
request->send(200, "application/json", json);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t MonitoringService::calculateHeapFragmentation() const {
|
|
||||||
size_t freeHeap = ESP.getFreeHeap();
|
|
||||||
size_t maxAllocHeap = 0; // Not available on ESP8266
|
|
||||||
|
|
||||||
if (maxAllocHeap == 0) return 0;
|
|
||||||
|
|
||||||
// Calculate fragmentation as percentage of free heap that can't be allocated in one block
|
|
||||||
return (freeHeap - maxAllocHeap) * 100 / freeHeap;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MonitoringService::getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const {
|
void MonitoringService::getFilesystemInfo(size_t& totalBytes, size_t& usedBytes) const {
|
||||||
totalBytes = 0;
|
totalBytes = 0;
|
||||||
|
|||||||
@@ -74,11 +74,10 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
|
|||||||
doc["flashChipSize"] = ESP.getFlashChipSize();
|
doc["flashChipSize"] = ESP.getFlashChipSize();
|
||||||
|
|
||||||
// Include local node labels if present
|
// Include local node labels if present
|
||||||
if (ctx.memberList) {
|
auto member = ctx.memberList->getMember(ctx.hostname.c_str());
|
||||||
auto it = ctx.memberList->find(ctx.hostname);
|
if (member) {
|
||||||
if (it != ctx.memberList->end()) {
|
|
||||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||||
for (const auto& kv : it->second.labels) {
|
for (const auto& kv : member->labels) {
|
||||||
labelsObj[kv.first.c_str()] = kv.second;
|
labelsObj[kv.first.c_str()] = kv.second;
|
||||||
}
|
}
|
||||||
} else if (!ctx.self.labels.empty()) {
|
} else if (!ctx.self.labels.empty()) {
|
||||||
@@ -87,7 +86,6 @@ void NodeService::handleStatusRequest(AsyncWebServerRequest* request) {
|
|||||||
labelsObj[kv.first.c_str()] = kv.second;
|
labelsObj[kv.first.c_str()] = kv.second;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
String json;
|
String json;
|
||||||
serializeJson(doc, json);
|
serializeJson(doc, json);
|
||||||
@@ -116,7 +114,7 @@ void NodeService::handleUpdateUpload(AsyncWebServerRequest* request, const Strin
|
|||||||
LOG_ERROR("OTA", "Update failed: not enough space");
|
LOG_ERROR("OTA", "Update failed: not enough space");
|
||||||
Update.printError(Serial);
|
Update.printError(Serial);
|
||||||
AsyncWebServerResponse* response = request->beginResponse(500, "application/json",
|
AsyncWebServerResponse* response = request->beginResponse(500, "application/json",
|
||||||
"{\"status\": \"FAIL\"}");
|
"{\"status\": \"FAIL\", \"message\": \"Update failed: not enough space\"}");
|
||||||
response->addHeader("Connection", "close");
|
response->addHeader("Connection", "close");
|
||||||
request->send(response);
|
request->send(response);
|
||||||
return;
|
return;
|
||||||
@@ -216,17 +214,17 @@ void NodeService::handleConfigRequest(AsyncWebServerRequest* request) {
|
|||||||
// Rebuild self.labels from constructor + config labels
|
// Rebuild self.labels from constructor + config labels
|
||||||
ctx.rebuildLabels();
|
ctx.rebuildLabels();
|
||||||
|
|
||||||
// TODO think of a better way to update the member list entry for the local node
|
// Update the member list entry for the local node if it exists
|
||||||
// Update the member list entry for this node if it exists
|
String localIPStr = ctx.localIP.toString();
|
||||||
if (ctx.memberList) {
|
auto member = ctx.memberList->getMember(localIPStr.c_str());
|
||||||
auto it = ctx.memberList->find(ctx.hostname);
|
if (member) {
|
||||||
if (it != ctx.memberList->end()) {
|
|
||||||
// Update the labels in the member list entry
|
// Update the labels in the member list entry
|
||||||
it->second.labels.clear();
|
NodeInfo updatedNode = *member;
|
||||||
|
updatedNode.labels.clear();
|
||||||
for (const auto& kv : ctx.self.labels) {
|
for (const auto& kv : ctx.self.labels) {
|
||||||
it->second.labels[kv.first] = kv.second;
|
updatedNode.labels[kv.first] = kv.second;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
ctx.memberList->updateMember(localIPStr.c_str(), updatedNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save config to file
|
// Save config to file
|
||||||
|
|||||||
@@ -1,185 +0,0 @@
|
|||||||
#include "spore/util/CpuUsage.h"
|
|
||||||
|
|
||||||
CpuUsage::CpuUsage()
|
|
||||||
: _initialized(false)
|
|
||||||
, _measuring(false)
|
|
||||||
, _measurementCount(0)
|
|
||||||
, _cycleStartTime(0)
|
|
||||||
, _idleStartTime(0)
|
|
||||||
, _totalIdleTime(0)
|
|
||||||
, _totalCycleTime(0)
|
|
||||||
, _currentCpuUsage(0.0f)
|
|
||||||
, _averageCpuUsage(0.0f)
|
|
||||||
, _maxCpuUsage(0.0f)
|
|
||||||
, _minCpuUsage(100.0f)
|
|
||||||
, _totalCpuTime(0)
|
|
||||||
, _rollingIndex(0)
|
|
||||||
, _rollingWindowFull(false) {
|
|
||||||
|
|
||||||
// Initialize rolling window
|
|
||||||
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
|
|
||||||
_rollingWindow[i] = 0.0f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void CpuUsage::begin() {
|
|
||||||
if (_initialized) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_initialized = true;
|
|
||||||
_measurementCount = 0;
|
|
||||||
_totalIdleTime = 0;
|
|
||||||
_totalCycleTime = 0;
|
|
||||||
_totalCpuTime = 0;
|
|
||||||
_currentCpuUsage = 0.0f;
|
|
||||||
_averageCpuUsage = 0.0f;
|
|
||||||
_maxCpuUsage = 0.0f;
|
|
||||||
_minCpuUsage = 100.0f;
|
|
||||||
_rollingIndex = 0;
|
|
||||||
_rollingWindowFull = false;
|
|
||||||
|
|
||||||
// Initialize rolling window
|
|
||||||
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
|
|
||||||
_rollingWindow[i] = 0.0f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void CpuUsage::startMeasurement() {
|
|
||||||
if (!_initialized) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_measuring) {
|
|
||||||
// If already measuring, end the previous measurement first
|
|
||||||
endMeasurement();
|
|
||||||
}
|
|
||||||
|
|
||||||
_measuring = true;
|
|
||||||
_cycleStartTime = millis();
|
|
||||||
_idleStartTime = millis();
|
|
||||||
}
|
|
||||||
|
|
||||||
void CpuUsage::endMeasurement() {
|
|
||||||
if (!_initialized || !_measuring) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
unsigned long cycleEndTime = millis();
|
|
||||||
unsigned long cycleDuration = cycleEndTime - _cycleStartTime;
|
|
||||||
|
|
||||||
// Calculate idle time (time spent in yield() calls)
|
|
||||||
unsigned long idleTime = cycleEndTime - _idleStartTime;
|
|
||||||
|
|
||||||
// Calculate CPU usage
|
|
||||||
if (cycleDuration > 0) {
|
|
||||||
_currentCpuUsage = ((float)(cycleDuration - idleTime) / (float)cycleDuration) * 100.0f;
|
|
||||||
|
|
||||||
// Clamp to valid range
|
|
||||||
if (_currentCpuUsage < 0.0f) {
|
|
||||||
_currentCpuUsage = 0.0f;
|
|
||||||
} else if (_currentCpuUsage > 100.0f) {
|
|
||||||
_currentCpuUsage = 100.0f;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update statistics
|
|
||||||
_totalCycleTime += cycleDuration;
|
|
||||||
_totalIdleTime += idleTime;
|
|
||||||
_totalCpuTime += (cycleDuration - idleTime);
|
|
||||||
_measurementCount++;
|
|
||||||
|
|
||||||
// Update rolling average
|
|
||||||
updateRollingAverage(_currentCpuUsage);
|
|
||||||
|
|
||||||
// Update min/max
|
|
||||||
updateMinMax(_currentCpuUsage);
|
|
||||||
|
|
||||||
// Calculate overall average
|
|
||||||
if (_measurementCount > 0) {
|
|
||||||
_averageCpuUsage = ((float)_totalCpuTime / (float)_totalCycleTime) * 100.0f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_measuring = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
float CpuUsage::getCpuUsage() const {
|
|
||||||
return _currentCpuUsage;
|
|
||||||
}
|
|
||||||
|
|
||||||
float CpuUsage::getAverageCpuUsage() const {
|
|
||||||
if (_rollingWindowFull) {
|
|
||||||
return _averageCpuUsage;
|
|
||||||
} else if (_measurementCount > 0) {
|
|
||||||
// Calculate average from rolling window
|
|
||||||
float sum = 0.0f;
|
|
||||||
for (size_t i = 0; i < _rollingIndex; ++i) {
|
|
||||||
sum += _rollingWindow[i];
|
|
||||||
}
|
|
||||||
return sum / (float)_rollingIndex;
|
|
||||||
}
|
|
||||||
return 0.0f;
|
|
||||||
}
|
|
||||||
|
|
||||||
float CpuUsage::getMaxCpuUsage() const {
|
|
||||||
return _maxCpuUsage;
|
|
||||||
}
|
|
||||||
|
|
||||||
float CpuUsage::getMinCpuUsage() const {
|
|
||||||
return _minCpuUsage;
|
|
||||||
}
|
|
||||||
|
|
||||||
void CpuUsage::reset() {
|
|
||||||
_measurementCount = 0;
|
|
||||||
_totalIdleTime = 0;
|
|
||||||
_totalCycleTime = 0;
|
|
||||||
_totalCpuTime = 0;
|
|
||||||
_currentCpuUsage = 0.0f;
|
|
||||||
_averageCpuUsage = 0.0f;
|
|
||||||
_maxCpuUsage = 0.0f;
|
|
||||||
_minCpuUsage = 100.0f;
|
|
||||||
_rollingIndex = 0;
|
|
||||||
_rollingWindowFull = false;
|
|
||||||
|
|
||||||
// Reset rolling window
|
|
||||||
for (size_t i = 0; i < ROLLING_WINDOW_SIZE; ++i) {
|
|
||||||
_rollingWindow[i] = 0.0f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool CpuUsage::isMeasuring() const {
|
|
||||||
return _measuring;
|
|
||||||
}
|
|
||||||
|
|
||||||
unsigned long CpuUsage::getMeasurementCount() const {
|
|
||||||
return _measurementCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
void CpuUsage::updateRollingAverage(float value) {
|
|
||||||
_rollingWindow[_rollingIndex] = value;
|
|
||||||
_rollingIndex++;
|
|
||||||
|
|
||||||
if (_rollingIndex >= ROLLING_WINDOW_SIZE) {
|
|
||||||
_rollingIndex = 0;
|
|
||||||
_rollingWindowFull = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate rolling average
|
|
||||||
float sum = 0.0f;
|
|
||||||
size_t count = _rollingWindowFull ? ROLLING_WINDOW_SIZE : _rollingIndex;
|
|
||||||
|
|
||||||
for (size_t i = 0; i < count; ++i) {
|
|
||||||
sum += _rollingWindow[i];
|
|
||||||
}
|
|
||||||
|
|
||||||
_averageCpuUsage = sum / (float)count;
|
|
||||||
}
|
|
||||||
|
|
||||||
void CpuUsage::updateMinMax(float value) {
|
|
||||||
if (value > _maxCpuUsage) {
|
|
||||||
_maxCpuUsage = value;
|
|
||||||
}
|
|
||||||
if (value < _minCpuUsage) {
|
|
||||||
_minCpuUsage = value;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user