feat: measure latency

This commit is contained in:
2025-09-25 21:54:25 +02:00
parent 356ec3d381
commit 096cf12704
3 changed files with 11 additions and 5 deletions

View File

@@ -43,5 +43,6 @@ private:
void onHeartbeat(const char* msg); void onHeartbeat(const char* msg);
void onResponse(const char* msg); void onResponse(const char* msg);
void onNodeInfo(const char* msg); void onNodeInfo(const char* msg);
unsigned long lastHeartbeatSentAt = 0;
std::vector<MessageHandler> messageHandlers; std::vector<MessageHandler> messageHandlers;
}; };

View File

@@ -17,7 +17,7 @@ struct NodeInfo {
uint32_t cpuFreqMHz = 0; uint32_t cpuFreqMHz = 0;
uint32_t flashChipSize = 0; uint32_t flashChipSize = 0;
} resources; } resources;
unsigned long latency = 0; // ms since lastSeen unsigned long latency = 0; // ms from heartbeat broadcast to NODE_INFO receipt
std::vector<EndpointInfo> endpoints; // List of registered endpoints std::vector<EndpointInfo> endpoints; // List of registered endpoints
std::map<String, String> labels; // Arbitrary node labels (key -> value) std::map<String, String> labels; // Arbitrary node labels (key -> value)
}; };

View File

@@ -14,12 +14,12 @@ ClusterManager::ClusterManager(NodeContext& ctx, TaskManager& taskMgr) : ctx(ctx
} }
void ClusterManager::registerTasks() { void ClusterManager::registerTasks() {
taskManager.registerTask("discovery_send", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); }); taskManager.registerTask("cluster_discovery", ctx.config.discovery_interval_ms, [this]() { sendDiscovery(); });
taskManager.registerTask("cluster_listen", ctx.config.discovery_interval_ms / 10, [this]() { listen(); }); taskManager.registerTask("cluster_listen", ctx.config.discovery_interval_ms / 10, [this]() { listen(); });
taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); }); taskManager.registerTask("status_update", ctx.config.status_update_interval_ms, [this]() { updateAllNodeStatuses(); removeDeadNodes(); });
taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); }); taskManager.registerTask("print_members", ctx.config.print_interval_ms, [this]() { printMemberList(); });
taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); }); taskManager.registerTask("heartbeat", ctx.config.heartbeat_interval_ms, [this]() { heartbeatTaskCallback(); });
taskManager.registerTask("update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); }); taskManager.registerTask("cluster_update_members_info", ctx.config.member_info_update_interval_ms, [this]() { updateAllMembersInfoTaskCallback(); });
LOG_INFO("ClusterManager", "Registered all cluster tasks"); LOG_INFO("ClusterManager", "Registered all cluster tasks");
} }
@@ -152,7 +152,11 @@ void ClusterManager::onNodeInfo(const char* msg) {
node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz; node.resources.cpuFreqMHz = doc["cpuFreqMHz"] | node.resources.cpuFreqMHz;
node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize; node.resources.flashChipSize = doc["flashChipSize"] | node.resources.flashChipSize;
node.status = NodeInfo::ACTIVE; node.status = NodeInfo::ACTIVE;
node.lastSeen = millis(); unsigned long now = millis();
node.lastSeen = now;
if (lastHeartbeatSentAt != 0) {
node.latency = now - lastHeartbeatSentAt;
}
node.labels.clear(); node.labels.clear();
if (doc["labels"].is<JsonObject>()) { if (doc["labels"].is<JsonObject>()) {
@@ -313,6 +317,7 @@ void ClusterManager::heartbeatTaskCallback() {
} }
// Broadcast heartbeat so peers can respond with their node info // Broadcast heartbeat so peers can respond with their node info
lastHeartbeatSentAt = millis();
ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port); ctx.udp->beginPacket("255.255.255.255", ctx.config.udp_port);
String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname; String hb = String(ClusterProtocol::HEARTBEAT_MSG) + ":" + ctx.hostname;
ctx.udp->write(hb.c_str()); ctx.udp->write(hb.c_str());