Compare commits
6 Commits
ce70830678
...
7bd3e87271
| Author | SHA1 | Date | |
|---|---|---|---|
| 7bd3e87271 | |||
| 0d09c5900c | |||
| 407b651b82 | |||
| 23289d9f09 | |||
| b6ad479352 | |||
| 3ed44cd00f |
3
ctl.sh
3
ctl.sh
@@ -372,12 +372,9 @@ function node {
|
||||
|
||||
# Cluster Configuration
|
||||
echo "=== Cluster Configuration ==="
|
||||
echo "Discovery Interval: $(echo "$response_body" | jq -r '.cluster.discovery_interval_ms // "N/A"') ms"
|
||||
echo "Heartbeat Interval: $(echo "$response_body" | jq -r '.cluster.heartbeat_interval_ms // "N/A"') ms"
|
||||
echo "Cluster Listen Interval: $(echo "$response_body" | jq -r '.cluster.cluster_listen_interval_ms // "N/A"') ms"
|
||||
echo "Status Update Interval: $(echo "$response_body" | jq -r '.cluster.status_update_interval_ms // "N/A"') ms"
|
||||
echo "Member Info Update Interval: $(echo "$response_body" | jq -r '.cluster.member_info_update_interval_ms // "N/A"') ms"
|
||||
echo "Print Interval: $(echo "$response_body" | jq -r '.cluster.print_interval_ms // "N/A"') ms"
|
||||
echo ""
|
||||
|
||||
# Node Status Thresholds
|
||||
|
||||
@@ -42,65 +42,79 @@ The cluster uses a UDP-based discovery protocol for automatic node detection:
|
||||
|
||||
### Discovery Process
|
||||
|
||||
1. **Discovery Broadcast**: Nodes periodically send UDP packets on port `udp_port` (default 4210)
|
||||
2. **Response Handling**: Nodes respond with `CLUSTER_RESPONSE:<hostname>`
|
||||
3. **Member Management**: Discovered nodes are added/updated in the cluster
|
||||
4. **Node Info via UDP**: Heartbeat triggers peers to send `CLUSTER_NODE_INFO:<hostname>:<json>`
|
||||
1. **Discovery Broadcast**: Nodes periodically send heartbeat messages on port `udp_port` (default 4210)
|
||||
2. **Response Handling**: Nodes respond with node update information containing their current state
|
||||
3. **Member Management**: Discovered nodes are added/updated in the cluster with current information
|
||||
4. **Node Synchronization**: Periodic broadcasts ensure all nodes maintain current cluster state
|
||||
|
||||
### Protocol Details
|
||||
|
||||
- **UDP Port**: 4210 (configurable via `Config.udp_port`)
|
||||
- **Discovery Message**: `CLUSTER_DISCOVERY`
|
||||
- **Response Message**: `CLUSTER_RESPONSE`
|
||||
- **Heartbeat Message**: `CLUSTER_HEARTBEAT`
|
||||
- **Node Info Message**: `CLUSTER_NODE_INFO:<hostname>:<json>`
|
||||
- **Heartbeat Message**: `CLUSTER_HEARTBEAT:hostname`
|
||||
- **Node Update Message**: `NODE_UPDATE:hostname:{json}`
|
||||
- **Broadcast Address**: 255.255.255.255
|
||||
- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms)
|
||||
- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms)
|
||||
- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms)
|
||||
|
||||
### Message Formats
|
||||
|
||||
- **Discovery**: `CLUSTER_DISCOVERY`
|
||||
- Sender: any node, broadcast to 255.255.255.255:`udp_port`
|
||||
- Purpose: announce presence and solicit peer identification
|
||||
- **Response**: `CLUSTER_RESPONSE:<hostname>`
|
||||
- Sender: node receiving a discovery; unicast to requester IP
|
||||
- Purpose: provide hostname so requester can register/update member
|
||||
- **Heartbeat**: `CLUSTER_HEARTBEAT:<hostname>`
|
||||
- **Heartbeat**: `CLUSTER_HEARTBEAT:hostname`
|
||||
- Sender: each node, broadcast to 255.255.255.255:`udp_port` on interval
|
||||
- Purpose: prompt peers to reply with their node info and keep liveness
|
||||
- **Node Info**: `CLUSTER_NODE_INFO:<hostname>:<json>`
|
||||
- Sender: node receiving a heartbeat; unicast to heartbeat sender IP
|
||||
- JSON fields: freeHeap, chipId, sdkVersion, cpuFreqMHz, flashChipSize, optional labels
|
||||
- Purpose: announce presence, prompt peers for node info, and keep liveness
|
||||
- **Node Update**: `NODE_UPDATE:hostname:{json}`
|
||||
- Sender: node responding to heartbeat or broadcasting current state
|
||||
- JSON fields: hostname, ip, uptime, optional labels
|
||||
- Purpose: provide current node information for cluster synchronization
|
||||
|
||||
### Discovery Flow
|
||||
|
||||
1. **Sender broadcasts** `CLUSTER_DISCOVERY`
|
||||
2. **Each receiver responds** with `CLUSTER_RESPONSE:<hostname>` to the sender IP
|
||||
3. **Sender registers/updates** the node using hostname and source IP
|
||||
|
||||
### Heartbeat Flow
|
||||
|
||||
1. **A node broadcasts** `CLUSTER_HEARTBEAT:<hostname>`
|
||||
2. **Each receiver replies** with `CLUSTER_NODE_INFO:<hostname>:<json>` to the heartbeat sender IP
|
||||
1. **A node broadcasts** `CLUSTER_HEARTBEAT:hostname` to announce its presence
|
||||
2. **Each receiver responds** with `NODE_UPDATE:hostname:{json}` containing current node state
|
||||
3. **The sender**:
|
||||
- Ensures the node exists or creates it with `hostname` and sender IP
|
||||
- Parses JSON and updates resources, labels, `status = ACTIVE`, `lastSeen = now`
|
||||
- Sets `latency = now - lastHeartbeatSentAt` (per-node, measured at heartbeat origin)
|
||||
- Ensures the responding node exists or creates it with current IP and information
|
||||
- Parses JSON and updates node info, `status = ACTIVE`, `lastSeen = now`
|
||||
- Calculates `latency = now - lastHeartbeatSentAt` for network performance monitoring
|
||||
|
||||
### Node Synchronization
|
||||
|
||||
1. **Event-driven broadcasts**: Nodes broadcast `NODE_UPDATE:hostname:{json}` when node information changes
|
||||
2. **All receivers**: Update their memberlist entry for the broadcasting node
|
||||
3. **Purpose**: Ensures all nodes maintain current cluster state and configuration
|
||||
|
||||
### Sequence Diagram
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant N1 as Node A (esp-node1)
|
||||
participant N2 as Node B (esp-node2)
|
||||
|
||||
Note over N1,N2: Discovery via heartbeat broadcast
|
||||
N1->>+N2: CLUSTER_HEARTBEAT:esp-node1
|
||||
|
||||
Note over N2: Node B responds with its current state
|
||||
N2->>+N1: NODE_UPDATE:esp-node1:{"hostname":"esp-node2","uptime":12345,"labels":{"role":"sensor"}}
|
||||
|
||||
Note over N1: Process NODE_UPDATE response
|
||||
N1-->>N1: Update memberlist for Node B
|
||||
N1-->>N1: Set Node B status = ACTIVE
|
||||
N1-->>N1: Calculate latency for Node B
|
||||
|
||||
Note over N1,N2: Event-driven node synchronization
|
||||
N1->>+N2: NODE_UPDATE:esp-node1:{"hostname":"esp-node1","uptime":12346,"labels":{"role":"controller"}}
|
||||
|
||||
Note over N2: Update memberlist with latest information
|
||||
N2-->>N2: Update Node A info, maintain ACTIVE status
|
||||
```
|
||||
|
||||
### Listener Behavior
|
||||
|
||||
The `cluster_listen` task parses one UDP packet per run and dispatches by prefix to:
|
||||
- **Discovery** → send `CLUSTER_RESPONSE`
|
||||
- **Heartbeat** → send `CLUSTER_NODE_INFO` JSON
|
||||
- **Response** → add/update node using provided hostname and source IP
|
||||
- **Node Info** → update resources/status/labels and record latency
|
||||
- **Heartbeat** → add/update responding node and send `NODE_UPDATE` response
|
||||
- **Node Update** → update node information and trigger memberlist logging
|
||||
|
||||
### Timing and Intervals
|
||||
|
||||
- **UDP Port**: `Config.udp_port` (default 4210)
|
||||
- **Discovery Interval**: `Config.discovery_interval_ms` (default 1000 ms)
|
||||
- **Listen Interval**: `Config.cluster_listen_interval_ms` (default 10 ms)
|
||||
- **Heartbeat Interval**: `Config.heartbeat_interval_ms` (default 5000 ms)
|
||||
|
||||
@@ -120,12 +134,9 @@ The system runs several background tasks at different intervals:
|
||||
|
||||
| Task | Interval (default) | Purpose |
|
||||
|------|--------------------|---------|
|
||||
| `cluster_discovery` | 1000 ms | Send UDP discovery packets |
|
||||
| `cluster_listen` | 10 ms | Listen for discovery/heartbeat/node-info |
|
||||
| `cluster_listen` | 10 ms | Listen for heartbeat/node-info messages |
|
||||
| `status_update` | 1000 ms | Update node status categories, purge dead |
|
||||
| `heartbeat` | 5000 ms | Broadcast heartbeat and update local resources |
|
||||
| `cluster_update_members_info` | 10000 ms | Reserved; no-op (info via UDP) |
|
||||
| `print_members` | 5000 ms | Log current member list |
|
||||
|
||||
### Task Management Features
|
||||
|
||||
@@ -142,12 +153,12 @@ The `NodeContext` provides an event-driven architecture for system-wide communic
|
||||
|
||||
```cpp
|
||||
// Subscribe to events
|
||||
ctx.on("node_discovered", [](void* data) {
|
||||
ctx.on("node/discovered", [](void* data) {
|
||||
NodeInfo* node = static_cast<NodeInfo*>(data);
|
||||
// Handle new node discovery
|
||||
});
|
||||
|
||||
ctx.on("cluster_updated", [](void* data) {
|
||||
ctx.on("cluster/updated", [](void* data) {
|
||||
// Handle cluster membership changes
|
||||
});
|
||||
```
|
||||
@@ -156,13 +167,13 @@ ctx.on("cluster_updated", [](void* data) {
|
||||
|
||||
```cpp
|
||||
// Publish events
|
||||
ctx.fire("node_discovered", &newNode);
|
||||
ctx.fire("cluster_updated", &clusterData);
|
||||
ctx.fire("node/discovered", &newNode);
|
||||
ctx.fire("cluster/updated", &clusterData);
|
||||
```
|
||||
|
||||
### Available Events
|
||||
|
||||
- **`node_discovered`**: New node added or local node refreshed
|
||||
- **`node/discovered`**: New node added or local node refreshed
|
||||
|
||||
## Resource Monitoring
|
||||
|
||||
|
||||
@@ -93,12 +93,9 @@ The configuration is stored as a JSON file with the following structure:
|
||||
"api_server_port": 80
|
||||
},
|
||||
"cluster": {
|
||||
"discovery_interval_ms": 1000,
|
||||
"heartbeat_interval_ms": 5000,
|
||||
"cluster_listen_interval_ms": 10,
|
||||
"status_update_interval_ms": 1000,
|
||||
"member_info_update_interval_ms": 10000,
|
||||
"print_interval_ms": 5000
|
||||
"status_update_interval_ms": 1000
|
||||
},
|
||||
"thresholds": {
|
||||
"node_active_threshold_ms": 10000,
|
||||
|
||||
@@ -14,7 +14,6 @@ class ClusterManager {
|
||||
public:
|
||||
ClusterManager(NodeContext& ctx, TaskManager& taskMgr);
|
||||
void registerTasks();
|
||||
void sendDiscovery();
|
||||
void listen();
|
||||
void addOrUpdateNode(const String& nodeHost, IPAddress nodeIP);
|
||||
void updateAllNodeStatuses();
|
||||
@@ -25,6 +24,7 @@ public:
|
||||
void updateLocalNodeResources();
|
||||
void heartbeatTaskCallback();
|
||||
void updateAllMembersInfoTaskCallback();
|
||||
void broadcastNodeUpdate();
|
||||
private:
|
||||
NodeContext& ctx;
|
||||
TaskManager& taskManager;
|
||||
@@ -35,18 +35,15 @@ private:
|
||||
};
|
||||
void initMessageHandlers();
|
||||
void handleIncomingMessage(const char* incoming);
|
||||
static bool isDiscoveryMsg(const char* msg);
|
||||
static bool isHeartbeatMsg(const char* msg);
|
||||
static bool isResponseMsg(const char* msg);
|
||||
static bool isNodeInfoMsg(const char* msg);
|
||||
static bool isNodeUpdateMsg(const char* msg);
|
||||
static bool isClusterEventMsg(const char* msg);
|
||||
static bool isRawMsg(const char* msg);
|
||||
void onDiscovery(const char* msg);
|
||||
void onHeartbeat(const char* msg);
|
||||
void onResponse(const char* msg);
|
||||
void onNodeInfo(const char* msg);
|
||||
void onNodeUpdate(const char* msg);
|
||||
void onClusterEvent(const char* msg);
|
||||
void onRawMessage(const char* msg);
|
||||
void sendNodeInfo(const String& hostname, const IPAddress& targetIP);
|
||||
unsigned long lastHeartbeatSentAt = 0;
|
||||
std::vector<MessageHandler> messageHandlers;
|
||||
};
|
||||
|
||||
@@ -5,10 +5,9 @@
|
||||
|
||||
// Cluster protocol and API constants
|
||||
namespace ClusterProtocol {
|
||||
constexpr const char* DISCOVERY_MSG = "CLUSTER_DISCOVERY";
|
||||
constexpr const char* RESPONSE_MSG = "CLUSTER_RESPONSE";
|
||||
// Simplified heartbeat-only protocol
|
||||
constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT";
|
||||
constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO";
|
||||
constexpr const char* NODE_UPDATE_MSG = "NODE_UPDATE";
|
||||
constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT";
|
||||
constexpr const char* RAW_MSG = "RAW";
|
||||
constexpr uint16_t UDP_PORT = 4210;
|
||||
@@ -18,12 +17,9 @@ namespace ClusterProtocol {
|
||||
}
|
||||
|
||||
namespace TaskIntervals {
|
||||
constexpr unsigned long SEND_DISCOVERY = 1000;
|
||||
constexpr unsigned long LISTEN_FOR_DISCOVERY = 100;
|
||||
constexpr unsigned long UPDATE_STATUS = 1000;
|
||||
constexpr unsigned long PRINT_MEMBER_LIST = 5000;
|
||||
constexpr unsigned long HEARTBEAT = 2000;
|
||||
constexpr unsigned long UPDATE_ALL_MEMBERS_INFO = 10000;
|
||||
}
|
||||
|
||||
constexpr unsigned long NODE_ACTIVE_THRESHOLD = 10000;
|
||||
|
||||
@@ -12,11 +12,10 @@ public:
|
||||
static constexpr const char* DEFAULT_WIFI_PASSWORD = "th3r31sn0sp00n";
|
||||
static constexpr uint16_t DEFAULT_UDP_PORT = 4210;
|
||||
static constexpr uint16_t DEFAULT_API_SERVER_PORT = 80;
|
||||
static constexpr unsigned long DEFAULT_DISCOVERY_INTERVAL_MS = 1000;
|
||||
static constexpr unsigned long DEFAULT_CLUSTER_LISTEN_INTERVAL_MS = 10;
|
||||
static constexpr unsigned long DEFAULT_HEARTBEAT_INTERVAL_MS = 5000;
|
||||
static constexpr unsigned long DEFAULT_STATUS_UPDATE_INTERVAL_MS = 1000;
|
||||
static constexpr unsigned long DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS = 10000;
|
||||
static constexpr unsigned long DEFAULT_NODE_UPDATE_BROADCAST_INTERVAL_MS = 5000;
|
||||
static constexpr unsigned long DEFAULT_PRINT_INTERVAL_MS = 5000;
|
||||
static constexpr unsigned long DEFAULT_NODE_ACTIVE_THRESHOLD_MS = 10000;
|
||||
static constexpr unsigned long DEFAULT_NODE_INACTIVE_THRESHOLD_MS = 60000;
|
||||
@@ -38,12 +37,9 @@ public:
|
||||
uint16_t api_server_port;
|
||||
|
||||
// Cluster Configuration
|
||||
unsigned long discovery_interval_ms;
|
||||
unsigned long heartbeat_interval_ms;
|
||||
unsigned long cluster_listen_interval_ms;
|
||||
unsigned long status_update_interval_ms;
|
||||
unsigned long member_info_update_interval_ms;
|
||||
unsigned long print_interval_ms;
|
||||
|
||||
// Node Status Thresholds
|
||||
unsigned long node_active_threshold_ms;
|
||||
|
||||
@@ -9,6 +9,7 @@ struct NodeInfo {
|
||||
String hostname;
|
||||
IPAddress ip;
|
||||
unsigned long lastSeen;
|
||||
unsigned long uptime = 0; // milliseconds since node started
|
||||
enum Status { ACTIVE, INACTIVE, DEAD } status;
|
||||
struct Resources {
|
||||
uint32_t freeHeap = 0;
|
||||
@@ -17,7 +18,7 @@ struct NodeInfo {
|
||||
uint32_t cpuFreqMHz = 0;
|
||||
uint32_t flashChipSize = 0;
|
||||
} resources;
|
||||
unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt
|
||||
unsigned long latency = 0; // ms from heartbeat broadcast to NODE_UPDATE receipt
|
||||
std::vector<EndpointInfo> endpoints; // List of registered endpoints
|
||||
std::map<String, String> labels; // Arbitrary node labels (key -> value)
|
||||
};
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
#include "spore/util/Logging.h"
|
||||
|
||||
ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx), taskManager(taskMgr) {
|
||||
// Register callback for node_discovered event
|
||||
ctx.on("node_discovered", [this](void* data) {
|
||||
// Register callback for node/discovered event
|
||||
ctx.on("node/discovered", [this](void* data) {
|
||||
NodeInfo* node = static_cast<NodeInfo*>(data);
|
||||
this->addOrUpdateNode(node->hostname, node->ip);
|
||||
});
|
||||
@@ -25,27 +25,30 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
|
||||
this->ctx.udp->write(msg.c_str());
|
||||
this->ctx.udp->endPacket();
|
||||
});
|
||||
|
||||
// Handler for node update broadcasts: services fire 'cluster/node/update' when their node info changes
|
||||
ctx.on("cluster/node/update", [this](void* data) {
|
||||
// Trigger immediate NODE_UPDATE broadcast when node info changes
|
||||
broadcastNodeUpdate();
|
||||
});
|
||||
|
||||
// Handler for memberlist changes: print memberlist when it changes
|
||||
ctx.on("cluster/memberlist/changed", [this](void* data) {
|
||||
printMemberList();
|
||||
});
|
||||
// Register tasks
|
||||
registerTasks();
|
||||
initMessageHandlers();
|
||||
}
|
||||
|
||||
void ClusterManager::registerTasks() {
|
||||
taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
|
||||
taskManager.registerTask("cluster_listen", ctx.config.cluster_listen_interval_ms, [this]() { listen(); });
|
||||
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
|
||||
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
|
||||
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
|
||||
taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
|
||||
LOG_INFO("ClusterManager", "Registered all cluster tasks");
|
||||
}
|
||||
|
||||
void ClusterManager::sendDiscovery() {
|
||||
//LOG_DEBUG(ctx, "Cluster", "Sending discovery packet...");
|
||||
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
||||
ctx.udp->write(ClusterProtocol::DISCOVERY_MSG);
|
||||
ctx.udp->endPacket();
|
||||
}
|
||||
// Discovery functionality removed - using heartbeat-only approach
|
||||
|
||||
void ClusterManager::listen() {
|
||||
int packetSize = ctx.udp->parsePacket();
|
||||
@@ -69,10 +72,8 @@ void ClusterManager::listen() {
|
||||
void ClusterManager::initMessageHandlers() {
|
||||
messageHandlers.clear();
|
||||
messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" });
|
||||
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
|
||||
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
|
||||
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
|
||||
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
|
||||
messageHandlers.push_back({ &ClusterManager::isNodeUpdateMsg, [this](const char* msg){ this->onNodeUpdate(msg); }, "NODE_UPDATE" });
|
||||
messageHandlers.push_back({ &ClusterManager::isClusterEventMsg, [this](const char* msg){ this->onClusterEvent(msg); }, "CLUSTER_EVENT" });
|
||||
}
|
||||
|
||||
@@ -94,20 +95,12 @@ void ClusterManager::handleIncomingMessage(const char* incoming) {
|
||||
LOG_DEBUG("Cluster", String("Unknown cluster message: ") + head);
|
||||
}
|
||||
|
||||
bool ClusterManager::isDiscoveryMsg(const char* msg) {
|
||||
return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isHeartbeatMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isResponseMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isNodeInfoMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
|
||||
bool ClusterManager::isNodeUpdateMsg(const char* msg) {
|
||||
return strncmp(msg, ClusterProtocol::NODE_UPDATE_MSG, strlen(ClusterProtocol::NODE_UPDATE_MSG)) == 0;
|
||||
}
|
||||
|
||||
bool ClusterManager::isClusterEventMsg(const char* msg) {
|
||||
@@ -123,87 +116,171 @@ bool ClusterManager::isRawMsg(const char* msg) {
|
||||
return msg[prefixLen] == ':';
|
||||
}
|
||||
|
||||
void ClusterManager::onDiscovery(const char* /*msg*/) {
|
||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
||||
ctx.udp->write(response.c_str());
|
||||
ctx.udp->endPacket();
|
||||
// Discovery functionality removed - using heartbeat-only approach
|
||||
|
||||
void ClusterManager::onHeartbeat(const char* msg) {
|
||||
// Extract hostname from heartbeat message: "CLUSTER_HEARTBEAT:hostname"
|
||||
const char* colon = strchr(msg, ':');
|
||||
if (!colon) {
|
||||
LOG_WARN("Cluster", "Invalid heartbeat message format");
|
||||
return;
|
||||
}
|
||||
|
||||
String hostname = String(colon + 1);
|
||||
IPAddress senderIP = ctx.udp->remoteIP();
|
||||
|
||||
// Update memberlist with the heartbeat
|
||||
addOrUpdateNode(hostname, senderIP);
|
||||
|
||||
// Respond with minimal node info (hostname, ip, uptime, labels)
|
||||
sendNodeInfo(hostname, senderIP);
|
||||
}
|
||||
|
||||
void ClusterManager::onHeartbeat(const char* /*msg*/) {
|
||||
void ClusterManager::onNodeUpdate(const char* msg) {
|
||||
// Message format: "NODE_UPDATE:hostname:{json}"
|
||||
const char* firstColon = strchr(msg, ':');
|
||||
if (!firstColon) {
|
||||
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
|
||||
return;
|
||||
}
|
||||
|
||||
const char* secondColon = strchr(firstColon + 1, ':');
|
||||
if (!secondColon) {
|
||||
LOG_WARN("Cluster", "Invalid NODE_UPDATE message format");
|
||||
return;
|
||||
}
|
||||
|
||||
String hostnamePart = String(firstColon + 1);
|
||||
String hostname = hostnamePart.substring(0, secondColon - firstColon - 1);
|
||||
const char* jsonCStr = secondColon + 1;
|
||||
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, jsonCStr);
|
||||
if (err) {
|
||||
LOG_WARN("Cluster", String("Failed to parse NODE_UPDATE JSON from ") + ctx.udp->remoteIP().toString());
|
||||
return;
|
||||
}
|
||||
|
||||
// The NODE_UPDATE contains info about the target node (hostname from message)
|
||||
// but is sent FROM the responding node (ctx.udp->remoteIP())
|
||||
// We need to find the responding node in the memberlist, not the target node
|
||||
IPAddress respondingNodeIP = ctx.udp->remoteIP();
|
||||
auto& memberList = *ctx.memberList;
|
||||
|
||||
// Find the responding node by IP address
|
||||
NodeInfo* respondingNode = nullptr;
|
||||
for (auto& pair : memberList) {
|
||||
if (pair.second.ip == respondingNodeIP) {
|
||||
respondingNode = &pair.second;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (respondingNode) {
|
||||
// Calculate latency only if we recently sent a heartbeat (within last 1 second)
|
||||
unsigned long latency = 0;
|
||||
unsigned long now = millis();
|
||||
if (lastHeartbeatSentAt != 0 && (now - lastHeartbeatSentAt) < 1000) { // 1 second window
|
||||
latency = now - lastHeartbeatSentAt;
|
||||
lastHeartbeatSentAt = 0; // Reset for next calculation
|
||||
}
|
||||
|
||||
// Update the responding node's information
|
||||
bool hostnameChanged = false;
|
||||
if (doc["hostname"].is<const char*>()) {
|
||||
String newHostname = doc["hostname"].as<const char*>();
|
||||
if (respondingNode->hostname != newHostname) {
|
||||
respondingNode->hostname = newHostname;
|
||||
hostnameChanged = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (doc["uptime"].is<unsigned long>()) {
|
||||
respondingNode->uptime = doc["uptime"];
|
||||
}
|
||||
|
||||
// Update labels if provided
|
||||
bool labelsChanged = false;
|
||||
if (doc["labels"].is<JsonObject>()) {
|
||||
// Check if labels actually changed
|
||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||
std::map<String, String> newLabels;
|
||||
for (JsonPair kvp : labelsObj) {
|
||||
const char* key = kvp.key().c_str();
|
||||
const char* value = labelsObj[kvp.key()];
|
||||
newLabels[key] = String(value);
|
||||
}
|
||||
|
||||
// Compare with existing labels
|
||||
if (newLabels != respondingNode->labels) {
|
||||
labelsChanged = true;
|
||||
respondingNode->labels = newLabels;
|
||||
}
|
||||
}
|
||||
|
||||
respondingNode->lastSeen = now;
|
||||
respondingNode->status = NodeInfo::ACTIVE;
|
||||
|
||||
// Update latency if we calculated it (preserve existing value if not)
|
||||
if (latency > 0) {
|
||||
respondingNode->latency = latency;
|
||||
}
|
||||
|
||||
// Check if any fields changed that require broadcasting
|
||||
bool nodeInfoChanged = hostnameChanged || labelsChanged;
|
||||
|
||||
if (nodeInfoChanged) {
|
||||
// Fire cluster/node/update event to trigger broadcast
|
||||
ctx.fire("cluster/node/update", nullptr);
|
||||
}
|
||||
|
||||
LOG_DEBUG("Cluster", String("Updated responding node ") + respondingNode->hostname + " @ " + respondingNodeIP.toString() +
|
||||
" | hostname: " + (hostnameChanged ? "changed" : "unchanged") +
|
||||
" | labels: " + (labelsChanged ? "changed" : "unchanged") +
|
||||
" | latency: " + (latency > 0 ? String(latency) + "ms" : "not calculated"));
|
||||
} else {
|
||||
LOG_WARN("Cluster", String("Received NODE_UPDATE from unknown node: ") + respondingNodeIP.toString());
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::sendNodeInfo(const String& targetHostname, const IPAddress& targetIP) {
|
||||
JsonDocument doc;
|
||||
|
||||
if (ctx.memberList) {
|
||||
auto it = ctx.memberList->find(ctx.hostname);
|
||||
if (it != ctx.memberList->end()) {
|
||||
// Get our node info for the response (we're the responding node)
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(ctx.hostname);
|
||||
if (it != memberList.end()) {
|
||||
const NodeInfo& node = it->second;
|
||||
|
||||
// Response contains info about ourselves (the responding node)
|
||||
doc["hostname"] = node.hostname;
|
||||
doc["ip"] = node.ip.toString();
|
||||
doc["uptime"] = node.uptime;
|
||||
|
||||
// Add labels if present
|
||||
if (!node.labels.empty()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : it->second.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
} else if (!ctx.self.labels.empty()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : ctx.self.labels) {
|
||||
for (const auto& kv : node.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Fallback to basic info
|
||||
doc["hostname"] = ctx.hostname;
|
||||
doc["ip"] = ctx.localIP.toString();
|
||||
doc["uptime"] = millis();
|
||||
}
|
||||
|
||||
String json;
|
||||
serializeJson(doc, json);
|
||||
|
||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
|
||||
// Send NODE_UPDATE:targetHostname:{json about responding node}
|
||||
ctx.udp->beginPacket(targetIP, ctx.config.udp_port);
|
||||
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + targetHostname + ":" + json;
|
||||
ctx.udp->write(msg.c_str());
|
||||
ctx.udp->endPacket();
|
||||
}
|
||||
|
||||
void ClusterManager::onResponse(const char* msg) {
|
||||
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
||||
String nodeHost = String(hostPtr);
|
||||
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
||||
}
|
||||
|
||||
void ClusterManager::onNodeInfo(const char* msg) {
|
||||
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
||||
char* hostEnd = strchr(p, ':');
|
||||
if (hostEnd) {
|
||||
*hostEnd = '\0';
|
||||
const char* hostCStr = p;
|
||||
const char* jsonCStr = hostEnd + 1;
|
||||
|
||||
String nodeHost = String(hostCStr);
|
||||
IPAddress senderIP = ctx.udp->remoteIP();
|
||||
|
||||
addOrUpdateNode(nodeHost, senderIP);
|
||||
|
||||
JsonDocument doc;
|
||||
DeserializationError err = deserializeJson(doc, jsonCStr);
|
||||
if (!err) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(nodeHost);
|
||||
if (it != memberList.end()) {
|
||||
NodeInfo& node = it->second;
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
unsigned long now = millis();
|
||||
node.lastSeen = now;
|
||||
if (lastHeartbeatSentAt != 0) {
|
||||
node.latency = now - lastHeartbeatSentAt;
|
||||
}
|
||||
|
||||
node.labels.clear();
|
||||
if (doc["labels"].is<JsonObject>()) {
|
||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||
for (JsonPair kvp : labelsObj) {
|
||||
const char* key = kvp.key().c_str();
|
||||
const char* value = labelsObj[kvp.key()];
|
||||
node.labels[key] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("Cluster", String("Sent NODE_UPDATE response to ") + targetHostname + " @ " + targetIP.toString());
|
||||
}
|
||||
|
||||
void ClusterManager::onClusterEvent(const char* msg) {
|
||||
@@ -269,26 +346,36 @@ void ClusterManager::onRawMessage(const char* msg) {
|
||||
|
||||
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||
auto& memberList = *ctx.memberList;
|
||||
|
||||
bool memberlistChanged = false;
|
||||
|
||||
// O(1) lookup instead of O(n) search
|
||||
auto it = memberList.find(nodeHost);
|
||||
if (it != memberList.end()) {
|
||||
// Update existing node
|
||||
it->second.ip = nodeIP;
|
||||
// Update existing node - preserve all existing field values
|
||||
if (it->second.ip != nodeIP) {
|
||||
it->second.ip = nodeIP;
|
||||
memberlistChanged = true;
|
||||
}
|
||||
it->second.lastSeen = millis();
|
||||
// Note: Other fields like latency, uptime, labels, etc. are preserved
|
||||
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
||||
} else {
|
||||
// Add new node
|
||||
NodeInfo newNode;
|
||||
newNode.hostname = nodeHost;
|
||||
newNode.ip = nodeIP;
|
||||
newNode.lastSeen = millis();
|
||||
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
||||
memberList[nodeHost] = newNode;
|
||||
memberlistChanged = true;
|
||||
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
|
||||
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
||||
return;
|
||||
}
|
||||
|
||||
// Add new node
|
||||
NodeInfo newNode;
|
||||
newNode.hostname = nodeHost;
|
||||
newNode.ip = nodeIP;
|
||||
newNode.lastSeen = millis();
|
||||
updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms);
|
||||
memberList[nodeHost] = newNode;
|
||||
LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0");
|
||||
//fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task
|
||||
|
||||
// Fire event if memberlist changed
|
||||
if (memberlistChanged) {
|
||||
ctx.fire("cluster/memberlist/changed", nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
// unused http client to fetch complete node info
|
||||
@@ -406,16 +493,19 @@ void ClusterManager::heartbeatTaskCallback() {
|
||||
NodeInfo& node = it->second;
|
||||
node.lastSeen = millis();
|
||||
node.status = NodeInfo::ACTIVE;
|
||||
node.uptime = millis(); // Update uptime
|
||||
updateLocalNodeResources();
|
||||
addOrUpdateNode(ctx.hostname, ctx.localIP);
|
||||
}
|
||||
|
||||
// Broadcast heartbeat so peers can respond with their node info
|
||||
// Broadcast heartbeat - peers will respond with NODE_UPDATE
|
||||
lastHeartbeatSentAt = millis();
|
||||
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
||||
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
|
||||
ctx.udp->write(hb.c_str());
|
||||
ctx.udp->endPacket();
|
||||
|
||||
LOG_DEBUG("Cluster", String("Sent heartbeat: ") + ctx.hostname);
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllMembersInfoTaskCallback() {
|
||||
@@ -423,6 +513,40 @@ void ClusterManager::updateAllMembersInfoTaskCallback() {
|
||||
// No-op to reduce network and memory usage
|
||||
}
|
||||
|
||||
void ClusterManager::broadcastNodeUpdate() {
|
||||
// Broadcast our current node info as NODE_UPDATE to all cluster members
|
||||
auto& memberList = *ctx.memberList;
|
||||
auto it = memberList.find(ctx.hostname);
|
||||
if (it == memberList.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const NodeInfo& node = it->second;
|
||||
|
||||
JsonDocument doc;
|
||||
doc["hostname"] = node.hostname;
|
||||
doc["uptime"] = node.uptime;
|
||||
|
||||
// Add labels if present
|
||||
if (!node.labels.empty()) {
|
||||
JsonObject labelsObj = doc["labels"].to<JsonObject>();
|
||||
for (const auto& kv : node.labels) {
|
||||
labelsObj[kv.first.c_str()] = kv.second;
|
||||
}
|
||||
}
|
||||
|
||||
String json;
|
||||
serializeJson(doc, json);
|
||||
|
||||
// Broadcast to all cluster members
|
||||
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
|
||||
String msg = String(ClusterProtocol::NODE_UPDATE_MSG) + ":" + ctx.hostname + ":" + json;
|
||||
ctx.udp->write(msg.c_str());
|
||||
ctx.udp->endPacket();
|
||||
|
||||
LOG_DEBUG("Cluster", String("Broadcasted NODE_UPDATE for ") + ctx.hostname);
|
||||
}
|
||||
|
||||
void ClusterManager::updateAllNodeStatuses() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
unsigned long now = millis();
|
||||
@@ -435,17 +559,24 @@ void ClusterManager::updateAllNodeStatuses() {
|
||||
void ClusterManager::removeDeadNodes() {
|
||||
auto& memberList = *ctx.memberList;
|
||||
unsigned long now = millis();
|
||||
|
||||
bool memberlistChanged = false;
|
||||
|
||||
// Use iterator to safely remove elements from map
|
||||
for (auto it = memberList.begin(); it != memberList.end(); ) {
|
||||
unsigned long diff = now - it->second.lastSeen;
|
||||
if (it->second.status == NodeInfo::DEAD && diff > ctx.config.node_dead_threshold_ms) {
|
||||
LOG_INFO("Cluster", "Removing node: " + it->second.hostname);
|
||||
it = memberList.erase(it);
|
||||
memberlistChanged = true;
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
// Fire event if memberlist changed
|
||||
if (memberlistChanged) {
|
||||
ctx.fire("cluster/memberlist/changed", nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterManager::printMemberList() {
|
||||
|
||||
@@ -128,5 +128,5 @@ void NetworkManager::setupWiFi() {
|
||||
}
|
||||
|
||||
// Notify listeners that the node is (re)discovered
|
||||
ctx.fire("node_discovered", &ctx.self);
|
||||
ctx.fire("node/discovered", &ctx.self);
|
||||
}
|
||||
|
||||
@@ -255,12 +255,9 @@ void NodeService::handleGetConfigRequest(AsyncWebServerRequest* request) {
|
||||
|
||||
// Cluster Configuration
|
||||
JsonObject clusterObj = doc["cluster"].to<JsonObject>();
|
||||
clusterObj["discovery_interval_ms"] = ctx.config.discovery_interval_ms;
|
||||
clusterObj["heartbeat_interval_ms"] = ctx.config.heartbeat_interval_ms;
|
||||
clusterObj["cluster_listen_interval_ms"] = ctx.config.cluster_listen_interval_ms;
|
||||
clusterObj["status_update_interval_ms"] = ctx.config.status_update_interval_ms;
|
||||
clusterObj["member_info_update_interval_ms"] = ctx.config.member_info_update_interval_ms;
|
||||
clusterObj["print_interval_ms"] = ctx.config.print_interval_ms;
|
||||
|
||||
// Node Status Thresholds
|
||||
JsonObject thresholdsObj = doc["thresholds"].to<JsonObject>();
|
||||
|
||||
@@ -32,12 +32,9 @@ void Config::setDefaults() {
|
||||
api_server_port = DEFAULT_API_SERVER_PORT;
|
||||
|
||||
// Cluster Configuration
|
||||
discovery_interval_ms = DEFAULT_DISCOVERY_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms
|
||||
cluster_listen_interval_ms = DEFAULT_CLUSTER_LISTEN_INTERVAL_MS;
|
||||
heartbeat_interval_ms = DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
status_update_interval_ms = DEFAULT_STATUS_UPDATE_INTERVAL_MS;
|
||||
member_info_update_interval_ms = DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS; // TODO retire this in favor of heartbeat_interval_ms
|
||||
print_interval_ms = DEFAULT_PRINT_INTERVAL_MS;
|
||||
|
||||
// Node Status Thresholds
|
||||
node_active_threshold_ms = DEFAULT_NODE_ACTIVE_THRESHOLD_MS;
|
||||
@@ -86,12 +83,9 @@ bool Config::saveToFile(const String& filename) {
|
||||
doc["network"]["api_server_port"] = api_server_port;
|
||||
|
||||
// Cluster Configuration
|
||||
doc["cluster"]["discovery_interval_ms"] = discovery_interval_ms;
|
||||
doc["cluster"]["heartbeat_interval_ms"] = heartbeat_interval_ms;
|
||||
doc["cluster"]["cluster_listen_interval_ms"] = cluster_listen_interval_ms;
|
||||
doc["cluster"]["status_update_interval_ms"] = status_update_interval_ms;
|
||||
doc["cluster"]["member_info_update_interval_ms"] = member_info_update_interval_ms;
|
||||
doc["cluster"]["print_interval_ms"] = print_interval_ms;
|
||||
|
||||
// Node Status Thresholds
|
||||
doc["thresholds"]["node_active_threshold_ms"] = node_active_threshold_ms;
|
||||
@@ -166,12 +160,9 @@ bool Config::loadFromFile(const String& filename) {
|
||||
api_server_port = doc["network"]["api_server_port"] | DEFAULT_API_SERVER_PORT;
|
||||
|
||||
// Load Cluster Configuration with defaults
|
||||
discovery_interval_ms = doc["cluster"]["discovery_interval_ms"] | DEFAULT_DISCOVERY_INTERVAL_MS;
|
||||
heartbeat_interval_ms = doc["cluster"]["heartbeat_interval_ms"] | DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
cluster_listen_interval_ms = doc["cluster"]["cluster_listen_interval_ms"] | DEFAULT_CLUSTER_LISTEN_INTERVAL_MS;
|
||||
status_update_interval_ms = doc["cluster"]["status_update_interval_ms"] | DEFAULT_STATUS_UPDATE_INTERVAL_MS;
|
||||
member_info_update_interval_ms = doc["cluster"]["member_info_update_interval_ms"] | DEFAULT_MEMBER_INFO_UPDATE_INTERVAL_MS;
|
||||
print_interval_ms = doc["cluster"]["print_interval_ms"] | DEFAULT_PRINT_INTERVAL_MS;
|
||||
|
||||
// Load Node Status Thresholds with defaults
|
||||
node_active_threshold_ms = doc["thresholds"]["node_active_threshold_ms"] | DEFAULT_NODE_ACTIVE_THRESHOLD_MS;
|
||||
|
||||
Reference in New Issue
Block a user