diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index 451df78..cd994f0 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -43,5 +43,6 @@ private: void onHeartbeat(const char* msg); void onResponse(const char* msg); void onNodeInfo(const char* msg); + unsigned long lastHeartbeatSentAt = 0; std::vector messageHandlers; }; diff --git a/include/spore/types/NodeInfo.h b/include/spore/types/NodeInfo.h index 66ae6cb..e6c73cb 100644 --- a/include/spore/types/NodeInfo.h +++ b/include/spore/types/NodeInfo.h @@ -17,7 +17,7 @@ struct NodeInfo { uint32_t cpuFreqMHz = 0; uint32_t flashChipSize = 0; } resources; - unsigned long latency = 0; // ms since lastSeen + unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt std::vector endpoints; // List of registered endpoints std::map labels; // Arbitrary node labels (key -> value) }; diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index c449ae4..f413086 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -14,12 +14,12 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx } void ClusterManager::registerTasks() { - taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); + taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); taskManager.registerTask("cluster_listen", ctx.config.discovery_interval_ms / 10, [this]() { listen(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); - taskManager.registerTask("update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); }); + taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); }); LOG_INFO("ClusterManager", "Registered all cluster tasks"); } @@ -152,7 +152,11 @@ void ClusterManager::onNodeInfo(const char* msg) { node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz; node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize; node.status = NodeInfo::ACTIVE; - node.lastSeen = millis(); + unsigned long now = millis(); + node.lastSeen = now; + if (lastHeartbeatSentAt != 0) { + node.latency = now - lastHeartbeatSentAt; + } node.labels.clear(); if (doc["labels"].is()) { @@ -188,7 +192,7 @@ void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { newNode.hostname = nodeHost; newNode.ip = nodeIP; newNode.lastSeen = millis(); - updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); + updateNodeStatus(newNode, newNode.lastSeen, ctx.config.node_inactive_threshold_ms, ctx.config.node_dead_threshold_ms); memberList[nodeHost] = newNode; LOG_INFO("Cluster", "Added node: " + nodeHost + " @ " + newNode.ip.toString() + " | Status: " + statusToStr(newNode.status) + " | last update: 0"); //fetchNodeInfo(nodeIP); // Do not fetch here, handled by periodic task @@ -313,6 +317,7 @@ void ClusterManager::heartbeatTaskCallback() { } // Broadcast heartbeat so peers can respond with their node info + lastHeartbeatSentAt = millis(); ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname; ctx.udp->write(hb.c_str());