diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index b0decd5..c0ca077 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include class ClusterManager { public: @@ -26,4 +28,22 @@ public: private: NodeContext& ctx; TaskManager& taskManager; + enum class ListenState { WAITING_FOR_PACKET, MESSAGE_RECEIVED, DISPATCHING, DONE }; + struct MessageHandler { + bool (*predicate)(const char*); + std::function handle; + const char* name; + }; + void initMessageHandlers(); + void handleIncomingMessage(const char* incoming); + static bool isDiscoveryMsg(const char* msg); + static bool isHeartbeatMsg(const char* msg); + static bool isResponseMsg(const char* msg); + static bool isNodeInfoMsg(const char* msg); + void onDiscovery(const char* msg); + void onHeartbeat(const char* msg); + void onResponse(const char* msg); + void onNodeInfo(const char* msg); + ListenState listenState = ListenState::WAITING_FOR_PACKET; + std::vector messageHandlers; }; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 1640088..f6196d7 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -10,6 +10,7 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx }); // Register tasks registerTasks(); + initMessageHandlers(); } void ClusterManager::registerTasks() { @@ -29,111 +30,160 @@ void ClusterManager::sendDiscovery() { ctx.udp->endPacket(); } -// TODO the various if statements here are a mess, we need to clean them up -// TODO we should use a state machine to handle the different types of messages -// TODO we should use a class to handle the different types of messages using predicate functions void ClusterManager::listenForDiscovery() { - int packetSize = ctx.udp->parsePacket(); - if (packetSize) { - char incoming[ClusterProtocol::UDP_BUF_SIZE]; - int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); - if (len > 0) { - incoming[len] = 0; + switch (listenState) { + case ListenState::WAITING_FOR_PACKET: { + int packetSize = ctx.udp->parsePacket(); + if (packetSize) { + listenState = ListenState::MESSAGE_RECEIVED; + } + break; } - //LOG_DEBUG(ctx, "UDP", "Packet received: " + String(incoming)); - if (strcmp(incoming, ClusterProtocol::DISCOVERY_MSG) == 0) { - //LOG_DEBUG(ctx, "UDP", "Discovery request from: " + ctx.udp->remoteIP().toString()); - ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); - String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; - ctx.udp->write(response.c_str()); - ctx.udp->endPacket(); - //LOG_DEBUG(ctx, "UDP", "Sent response with hostname: " + ctx.hostname); - } else if (strncmp(incoming, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0) { - // Reply directly to heartbeat sender with our node info JSON - // Format: CLUSTER_NODE_INFO:: - JsonDocument doc; - doc["freeHeap"] = ESP.getFreeHeap(); - doc["chipId"] = ESP.getChipId(); - doc["sdkVersion"] = ESP.getSdkVersion(); - doc["cpuFreqMHz"] = ESP.getCpuFreqMHz(); - doc["flashChipSize"] = ESP.getFlashChipSize(); + case ListenState::MESSAGE_RECEIVED: { + char incoming[ClusterProtocol::UDP_BUF_SIZE]; + int len = ctx.udp->read(incoming, ClusterProtocol::UDP_BUF_SIZE); + if (len > 0) { + incoming[len] = 0; + listenState = ListenState::DISPATCHING; + handleIncomingMessage(incoming); + } else { + listenState = ListenState::DONE; + } + break; + } + case ListenState::DISPATCHING: { + // handled synchronously + listenState = ListenState::DONE; + break; + } + case ListenState::DONE: { + listenState = ListenState::WAITING_FOR_PACKET; + break; + } + } +} - // Include labels if available - if (ctx.memberList) { - auto it = ctx.memberList->find(ctx.hostname); - if (it != ctx.memberList->end()) { - JsonObject labelsObj = doc["labels"].to(); - for (const auto& kv : it->second.labels) { - labelsObj[kv.first.c_str()] = kv.second; - } - } else if (!ctx.self.labels.empty()) { - JsonObject labelsObj = doc["labels"].to(); - for (const auto& kv : ctx.self.labels) { - labelsObj[kv.first.c_str()] = kv.second; +void ClusterManager::initMessageHandlers() { + messageHandlers.clear(); + messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" }); + messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" }); + messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" }); + messageHandlers.push_back({ &ClusterManager::isNodeInfoMsg, [this](const char* msg){ this->onNodeInfo(msg); }, "NODE_INFO" }); +} + +void ClusterManager::handleIncomingMessage(const char* incoming) { + for (const auto& h : messageHandlers) { + if (h.predicate(incoming)) { + h.handle(incoming); + return; + } + } +} + +bool ClusterManager::isDiscoveryMsg(const char* msg) { + return strcmp(msg, ClusterProtocol::DISCOVERY_MSG) == 0; +} + +bool ClusterManager::isHeartbeatMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::HEARTBEAT_MSG, strlen(ClusterProtocol::HEARTBEAT_MSG)) == 0; +} + +bool ClusterManager::isResponseMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0; +} + +bool ClusterManager::isNodeInfoMsg(const char* msg) { + return strncmp(msg, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0; +} + +void ClusterManager::onDiscovery(const char* /*msg*/) { + ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); + String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; + ctx.udp->write(response.c_str()); + ctx.udp->endPacket(); +} + +void ClusterManager::onHeartbeat(const char* /*msg*/) { + JsonDocument doc; + doc["freeHeap"] = ESP.getFreeHeap(); + doc["chipId"] = ESP.getChipId(); + doc["sdkVersion"] = ESP.getSdkVersion(); + doc["cpuFreqMHz"] = ESP.getCpuFreqMHz(); + doc["flashChipSize"] = ESP.getFlashChipSize(); + + if (ctx.memberList) { + auto it = ctx.memberList->find(ctx.hostname); + if (it != ctx.memberList->end()) { + JsonObject labelsObj = doc["labels"].to(); + for (const auto& kv : it->second.labels) { + labelsObj[kv.first.c_str()] = kv.second; + } + } else if (!ctx.self.labels.empty()) { + JsonObject labelsObj = doc["labels"].to(); + for (const auto& kv : ctx.self.labels) { + labelsObj[kv.first.c_str()] = kv.second; + } + } + } + + String json; + serializeJson(doc, json); + + ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); + String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json; + ctx.udp->write(msg.c_str()); + ctx.udp->endPacket(); +} + +void ClusterManager::onResponse(const char* msg) { + char* hostPtr = const_cast(msg) + strlen(ClusterProtocol::RESPONSE_MSG) + 1; + String nodeHost = String(hostPtr); + addOrUpdateNode(nodeHost, ctx.udp->remoteIP()); +} + +void ClusterManager::onNodeInfo(const char* msg) { + char* p = const_cast(msg) + strlen(ClusterProtocol::NODE_INFO_MSG) + 1; + char* hostEnd = strchr(p, ':'); + if (hostEnd) { + *hostEnd = '\0'; + const char* hostCStr = p; + const char* jsonCStr = hostEnd + 1; + + String nodeHost = String(hostCStr); + IPAddress senderIP = ctx.udp->remoteIP(); + + addOrUpdateNode(nodeHost, senderIP); + + JsonDocument doc; + DeserializationError err = deserializeJson(doc, jsonCStr); + if (!err) { + auto& memberList = *ctx.memberList; + auto it = memberList.find(nodeHost); + if (it != memberList.end()) { + NodeInfo& node = it->second; + node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap; + node.resources.chipId = doc["chipId"] | node.resources.chipId; + { + const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str(); + node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion; + } + node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz; + node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize; + node.status = NodeInfo::ACTIVE; + node.lastSeen = millis(); + + node.labels.clear(); + if (doc["labels"].is()) { + JsonObject labelsObj = doc["labels"].as(); + for (JsonPair kvp : labelsObj) { + const char* key = kvp.key().c_str(); + const char* value = labelsObj[kvp.key()]; + node.labels[key] = value; } } } - - String json; - serializeJson(doc, json); - - ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); - String msg = String(ClusterProtocol::NODE_INFO_MSG) + ":" + ctx.hostname + ":" + json; - ctx.udp->write(msg.c_str()); - ctx.udp->endPacket(); - } else if (strncmp(incoming, ClusterProtocol::RESPONSE_MSG, strlen(ClusterProtocol::RESPONSE_MSG)) == 0) { - char* hostPtr = incoming + strlen(ClusterProtocol::RESPONSE_MSG) + 1; - String nodeHost = String(hostPtr); - addOrUpdateNode(nodeHost, ctx.udp->remoteIP()); - } else if (strncmp(incoming, ClusterProtocol::NODE_INFO_MSG, strlen(ClusterProtocol::NODE_INFO_MSG)) == 0) { - // Parse: CLUSTER_NODE_INFO:: - char* p = incoming + strlen(ClusterProtocol::NODE_INFO_MSG) + 1; - char* hostEnd = strchr(p, ':'); - if (hostEnd) { - *hostEnd = '\0'; - const char* hostCStr = p; - const char* jsonCStr = hostEnd + 1; - - String nodeHost = String(hostCStr); - IPAddress senderIP = ctx.udp->remoteIP(); - - // Ensure node exists/updated basic info - addOrUpdateNode(nodeHost, senderIP); - - // Parse JSON - JsonDocument doc; - DeserializationError err = deserializeJson(doc, jsonCStr); - if (!err) { - auto& memberList = *ctx.memberList; - auto it = memberList.find(nodeHost); - if (it != memberList.end()) { - NodeInfo& node = it->second; - node.resources.freeHeap = doc["freeHeap"] | node.resources.freeHeap; - node.resources.chipId = doc["chipId"] | node.resources.chipId; - { - const char* sdk = doc["sdkVersion"] | node.resources.sdkVersion.c_str(); - node.resources.sdkVersion = sdk ? String(sdk) : node.resources.sdkVersion; - } - node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz; - node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize; - node.status = NodeInfo::ACTIVE; - node.lastSeen = millis(); - - // Labels - node.labels.clear(); - if (doc["labels"].is()) { - JsonObject labelsObj = doc["labels"].as(); - for (JsonPair kvp : labelsObj) { - const char* key = kvp.key().c_str(); - const char* value = labelsObj[kvp.key()]; - node.labels[key] = value; - } - } - } - } else { - LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString()); - } - } + } else { + LOG_DEBUG("Cluster", String("Failed to parse NODE_INFO JSON from ") + senderIP.toString()); } } }