feat: introduce udp state machine
This commit is contained in:
@@ -7,6 +7,8 @@
|
|||||||
#include <ArduinoJson.h>
|
#include <ArduinoJson.h>
|
||||||
#include <ESP8266HTTPClient.h>
|
#include <ESP8266HTTPClient.h>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <vector>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
class ClusterManager {
|
class ClusterManager {
|
||||||
public:
|
public:
|
||||||
@@ -26,4 +28,22 @@ public:
|
|||||||
private:
|
private:
|
||||||
NodeContext& ctx;
|
NodeContext& ctx;
|
||||||
TaskManager& taskManager;
|
TaskManager& taskManager;
|
||||||
|
enum class ListenState { WAITING_FOR_PACKET, MESSAGE_RECEIVED, DISPATCHING, DONE };
|
||||||
|
struct MessageHandler {
|
||||||
|
bool (*predicate)(const char*);
|
||||||
|
std::function<void(const char*)> handle;
|
||||||
|
const char* name;
|
||||||
|
};
|
||||||
|
void initMessageHandlers();
|
||||||
|
void handleIncomingMessage(const char* incoming);
|
||||||
|
static bool isDiscoveryMsg(const char* msg);
|
||||||
|
static bool isHeartbeatMsg(const char* msg);
|
||||||
|
static bool isResponseMsg(const char* msg);
|
||||||
|
static bool isNodeInfoMsg(const char* msg);
|
||||||
|
void onDiscovery(const char* msg);
|
||||||
|
void onHeartbeat(const char* msg);
|
||||||
|
void onResponse(const char* msg);
|
||||||
|
void onNodeInfo(const char* msg);
|
||||||
|
ListenState listenState = ListenState::WAITING_FOR_PACKET;
|
||||||
|
std::vector<MessageHandler> messageHandlers;
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
|
|||||||
});
|
});
|
||||||
// Register tasks
|
// Register tasks
|
||||||
registerTasks();
|
registerTasks();
|
||||||
|
initMessageHandlers();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::registerTasks() {
|
void ClusterManager::registerTasks() {
|
||||||
@@ -29,28 +30,80 @@ void ClusterManager::sendDiscovery() {
|
|||||||
ctx.udp->endPacket();
|
ctx.udp->endPacket();
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO the various if statements here are a mess, we need to clean them up
|
|
||||||
// TODO we should use a state machine to handle the different types of messages
|
|
||||||
// TODO we should use a class to handle the different types of messages using predicate functions
|
|
||||||
void ClusterManager::listenForDiscovery() {
|
void ClusterManager::listenForDiscovery() {
|
||||||
|
switch (listenState) {
|
||||||
|
case ListenState::WAITING_FOR_PACKET: {
|
||||||
int packetSize = ctx.udp->parsePacket();
|
int packetSize = ctx.udp->parsePacket();
|
||||||
if (packetSize) {
|
if (packetSize) {
|
||||||
|
listenState = ListenState::MESSAGE_RECEIVED;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ListenState::MESSAGE_RECEIVED: {
|
||||||
char incoming[ClusterProtocol::UDP_BUF_SIZE];
|
char incoming[ClusterProtocol::UDP_BUF_SIZE];
|
||||||
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
|
int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE);
|
||||||
if (len > 0) {
|
if (len > 0) {
|
||||||
incoming[len] = 0;
|
incoming[len] = 0;
|
||||||
|
listenState = ListenState::DISPATCHING;
|
||||||
|
handleIncomingMessage(incoming);
|
||||||
|
} else {
|
||||||
|
listenState = ListenState::DONE;
|
||||||
}
|
}
|
||||||
//LOG_DEBUG(ctx, "UDP", "Packet received: " + String(incoming));
|
break;
|
||||||
if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) {
|
}
|
||||||
//LOG_DEBUG(ctx, "UDP", "Discovery request from: " + ctx.udp->remoteIP().toString());
|
case ListenState::DISPATCHING: {
|
||||||
|
// handled synchronously
|
||||||
|
listenState = ListenState::DONE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ListenState::DONE: {
|
||||||
|
listenState = ListenState::WAITING_FOR_PACKET;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClusterManager::initMessageHandlers() {
|
||||||
|
messageHandlers.clear();
|
||||||
|
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
|
||||||
|
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
|
||||||
|
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
|
||||||
|
messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" });
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClusterManager::handleIncomingMessage(const char* incoming) {
|
||||||
|
for (const auto& h : messageHandlers) {
|
||||||
|
if (h.predicate(incoming)) {
|
||||||
|
h.handle(incoming);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ClusterManager::isDiscoveryMsg(const char* msg) {
|
||||||
|
return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ClusterManager::isHeartbeatMsg(const char* msg) {
|
||||||
|
return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ClusterManager::isResponseMsg(const char* msg) {
|
||||||
|
return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ClusterManager::isNodeInfoMsg(const char* msg) {
|
||||||
|
return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ClusterManager::onDiscovery(const char* /*msg*/) {
|
||||||
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
|
||||||
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
|
||||||
ctx.udp->write(response.c_str());
|
ctx.udp->write(response.c_str());
|
||||||
ctx.udp->endPacket();
|
ctx.udp->endPacket();
|
||||||
//LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname);
|
}
|
||||||
} else if (strncmp(incoming, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0) {
|
|
||||||
// Reply directly to heartbeat sender with our node info JSON
|
void ClusterManager::onHeartbeat(const char* /*msg*/) {
|
||||||
// Format: CLUSTER_NODE_INFO:<hostname>:<json>
|
|
||||||
JsonDocument doc;
|
JsonDocument doc;
|
||||||
doc["freeHeap"] = ESP.getFreeHeap();
|
doc["freeHeap"] = ESP.getFreeHeap();
|
||||||
doc["chipId"] = ESP.getChipId();
|
doc["chipId"] = ESP.getChipId();
|
||||||
@@ -58,7 +111,6 @@ void ClusterManager::listenForDiscovery() {
|
|||||||
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
|
doc["cpuFreqMHz"] = ESP.getCpuFreqMHz();
|
||||||
doc["flashChipSize"] = ESP.getFlashChipSize();
|
doc["flashChipSize"] = ESP.getFlashChipSize();
|
||||||
|
|
||||||
// Include labels if available
|
|
||||||
if (ctx.memberList) {
|
if (ctx.memberList) {
|
||||||
auto it = ctx.memberList->find(ctx.hostname);
|
auto it = ctx.memberList->find(ctx.hostname);
|
||||||
if (it != ctx.memberList->end()) {
|
if (it != ctx.memberList->end()) {
|
||||||
@@ -81,13 +133,16 @@ void ClusterManager::listenForDiscovery() {
|
|||||||
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
|
String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json;
|
||||||
ctx.udp->write(msg.c_str());
|
ctx.udp->write(msg.c_str());
|
||||||
ctx.udp->endPacket();
|
ctx.udp->endPacket();
|
||||||
} else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) {
|
}
|
||||||
char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
|
||||||
|
void ClusterManager::onResponse(const char* msg) {
|
||||||
|
char* hostPtr = const_cast<char*>(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1;
|
||||||
String nodeHost = String(hostPtr);
|
String nodeHost = String(hostPtr);
|
||||||
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
addOrUpdateNode(nodeHost, ctx.udp->remoteIP());
|
||||||
} else if (strncmp(incoming, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0) {
|
}
|
||||||
// Parse: CLUSTER_NODE_INFO:<hostname>:<json>
|
|
||||||
char* p = incoming + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
void ClusterManager::onNodeInfo(const char* msg) {
|
||||||
|
char* p = const_cast<char*>(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1;
|
||||||
char* hostEnd = strchr(p, ':');
|
char* hostEnd = strchr(p, ':');
|
||||||
if (hostEnd) {
|
if (hostEnd) {
|
||||||
*hostEnd = '\0';
|
*hostEnd = '\0';
|
||||||
@@ -97,10 +152,8 @@ void ClusterManager::listenForDiscovery() {
|
|||||||
String nodeHost = String(hostCStr);
|
String nodeHost = String(hostCStr);
|
||||||
IPAddress senderIP = ctx.udp->remoteIP();
|
IPAddress senderIP = ctx.udp->remoteIP();
|
||||||
|
|
||||||
// Ensure node exists/updated basic info
|
|
||||||
addOrUpdateNode(nodeHost, senderIP);
|
addOrUpdateNode(nodeHost, senderIP);
|
||||||
|
|
||||||
// Parse JSON
|
|
||||||
JsonDocument doc;
|
JsonDocument doc;
|
||||||
DeserializationError err = deserializeJson(doc, jsonCStr);
|
DeserializationError err = deserializeJson(doc, jsonCStr);
|
||||||
if (!err) {
|
if (!err) {
|
||||||
@@ -119,7 +172,6 @@ void ClusterManager::listenForDiscovery() {
|
|||||||
node.status = NodeInfo::ACTIVE;
|
node.status = NodeInfo::ACTIVE;
|
||||||
node.lastSeen = millis();
|
node.lastSeen = millis();
|
||||||
|
|
||||||
// Labels
|
|
||||||
node.labels.clear();
|
node.labels.clear();
|
||||||
if (doc["labels"].is<JsonObject>()) {
|
if (doc["labels"].is<JsonObject>()) {
|
||||||
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
JsonObject labelsObj = doc["labels"].as<JsonObject>();
|
||||||
@@ -134,8 +186,6 @@ void ClusterManager::listenForDiscovery() {
|
|||||||
LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
|
LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
|
||||||
|
|||||||
Reference in New Issue
Block a user