From c4b1a2d8537b66cee6bf42cca2de91da9e747378 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Sun, 26 Oct 2025 11:58:09 +0100 Subject: [PATCH] feat: periodic polling of cluster status instead of every nodes heartbeat --- internal/discovery/types.go | 2 +- internal/websocket/websocket.go | 136 ++++++++++++++++++++++++++------ 2 files changed, 112 insertions(+), 26 deletions(-) diff --git a/internal/discovery/types.go b/internal/discovery/types.go index fc2b9d9..de06afa 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -57,7 +57,7 @@ func NewNodeDiscovery(udpPort string) *NodeDiscovery { return &NodeDiscovery{ udpPort: udpPort, discoveredNodes: make(map[string]*NodeInfo), - staleThreshold: 10 * time.Second, // TODO make configurable + staleThreshold: 10 * time.Second, // Heartbeat timeout - mark nodes inactive after 10 seconds logger: log.New(), } } diff --git a/internal/websocket/websocket.go b/internal/websocket/websocket.go index 733f4f0..e0e98a2 100644 --- a/internal/websocket/websocket.go +++ b/internal/websocket/websocket.go @@ -23,26 +23,34 @@ var upgrader = websocket.Upgrader{ // WebSocketServer manages WebSocket connections and broadcasts type WebSocketServer struct { - nodeDiscovery *discovery.NodeDiscovery - sporeClients map[string]*client.SporeClient - clients map[*websocket.Conn]bool - mutex sync.RWMutex - writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections - logger *log.Logger + nodeDiscovery *discovery.NodeDiscovery + sporeClients map[string]*client.SporeClient + clients map[*websocket.Conn]bool + mutex sync.RWMutex + writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections + logger *log.Logger + clusterInfoTicker *time.Ticker + clusterInfoStopCh chan bool + clusterInfoInterval time.Duration } // NewWebSocketServer creates a new WebSocket server func NewWebSocketServer(nodeDiscovery *discovery.NodeDiscovery) *WebSocketServer { wss := &WebSocketServer{ - nodeDiscovery: nodeDiscovery, - sporeClients: make(map[string]*client.SporeClient), - clients: make(map[*websocket.Conn]bool), - logger: log.New(), + nodeDiscovery: nodeDiscovery, + sporeClients: make(map[string]*client.SporeClient), + clients: make(map[*websocket.Conn]bool), + logger: log.New(), + clusterInfoStopCh: make(chan bool), + clusterInfoInterval: 5 * time.Second, // Fetch cluster info every 5 seconds } // Register callback for node updates nodeDiscovery.AddCallback(wss.handleNodeUpdate) + // Start periodic cluster info fetching + go wss.startPeriodicClusterInfoFetching() + return wss } @@ -151,17 +159,48 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) { } } +// startPeriodicClusterInfoFetching starts a goroutine that periodically fetches cluster info +func (wss *WebSocketServer) startPeriodicClusterInfoFetching() { + wss.clusterInfoTicker = time.NewTicker(wss.clusterInfoInterval) + defer wss.clusterInfoTicker.Stop() + + wss.logger.WithField("interval", wss.clusterInfoInterval).Info("Starting periodic cluster info fetching") + + for { + select { + case <-wss.clusterInfoTicker.C: + wss.fetchAndBroadcastClusterInfo() + case <-wss.clusterInfoStopCh: + wss.logger.Info("Stopping periodic cluster info fetching") + return + } + } +} + +// fetchAndBroadcastClusterInfo fetches cluster info and broadcasts it to clients +func (wss *WebSocketServer) fetchAndBroadcastClusterInfo() { + // Only fetch if we have clients connected + wss.mutex.RLock() + clientCount := len(wss.clients) + wss.mutex.RUnlock() + + if clientCount == 0 { + return + } + + wss.logger.Debug("Periodically fetching cluster info") + wss.broadcastClusterUpdate() +} + // handleNodeUpdate is called when node information changes func (wss *WebSocketServer) handleNodeUpdate(nodeIP, action string) { wss.logger.WithFields(log.Fields{ "node_ip": nodeIP, "action": action, - }).Debug("Node update received, broadcasting to WebSocket clients") + }).Debug("Node update received, broadcasting node discovery event") - // Broadcast cluster update to all clients - wss.broadcastClusterUpdate() - - // Also broadcast node discovery event + // Only broadcast node discovery event, not cluster update + // Cluster updates are now handled by periodic fetching wss.broadcastNodeDiscovery(nodeIP, action) } @@ -449,9 +488,11 @@ func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember, "member_count": len(clusterStatus.Members), }).Debug("Successfully fetched cluster members from primary node") - // Update local node data with API information + // Update local node data with API information but preserve heartbeat status wss.updateLocalNodesWithAPI(clusterStatus.Members) - return clusterStatus.Members, nil + + // Return merged data with heartbeat-based status override + return wss.mergeAPIWithHeartbeatStatus(clusterStatus.Members), nil } wss.logger.WithFields(log.Fields{ "primary_node": primaryNode, @@ -467,20 +508,62 @@ func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember, // updateLocalNodesWithAPI updates local node data with information from API func (wss *WebSocketServer) updateLocalNodesWithAPI(apiMembers []client.ClusterMember) { - // This would update the local node discovery with fresh API data - // For now, we'll just log that we received the data wss.logger.WithField("members", len(apiMembers)).Debug("Updating local nodes with API data") for _, member := range apiMembers { - if len(member.Labels) > 0 { - wss.logger.WithFields(log.Fields{ - "ip": member.IP, - "labels": member.Labels, - }).Debug("API member labels") - } + // Update local node with API data, but preserve heartbeat-based status + wss.updateNodeWithAPIData(member) } } +// updateNodeWithAPIData updates a single node with API data while preserving heartbeat status +func (wss *WebSocketServer) updateNodeWithAPIData(apiMember client.ClusterMember) { + nodes := wss.nodeDiscovery.GetNodes() + if localNode, exists := nodes[apiMember.IP]; exists { + // Update additional data from API but preserve heartbeat-based status + localNode.Labels = apiMember.Labels + localNode.Resources = apiMember.Resources + localNode.Latency = apiMember.Latency + + // Only update hostname if it's different and not empty + if apiMember.Hostname != "" && apiMember.Hostname != localNode.Hostname { + localNode.Hostname = apiMember.Hostname + } + + wss.logger.WithFields(log.Fields{ + "ip": apiMember.IP, + "labels": apiMember.Labels, + "status": localNode.Status, // Keep heartbeat-based status + }).Debug("Updated node with API data, preserved heartbeat status") + } +} + +// mergeAPIWithHeartbeatStatus merges API member data with heartbeat-based status +func (wss *WebSocketServer) mergeAPIWithHeartbeatStatus(apiMembers []client.ClusterMember) []client.ClusterMember { + localNodes := wss.nodeDiscovery.GetNodes() + mergedMembers := make([]client.ClusterMember, 0, len(apiMembers)) + + for _, apiMember := range apiMembers { + mergedMember := apiMember + + // Override status with heartbeat-based status if we have local data + if localNode, exists := localNodes[apiMember.IP]; exists { + mergedMember.Status = string(localNode.Status) + mergedMember.LastSeen = localNode.LastSeen.Unix() + + wss.logger.WithFields(log.Fields{ + "ip": apiMember.IP, + "api_status": apiMember.Status, + "heartbeat_status": localNode.Status, + }).Debug("Overriding API status with heartbeat status") + } + + mergedMembers = append(mergedMembers, mergedMember) + } + + return mergedMembers +} + // getFallbackClusterMembers returns local node data as fallback func (wss *WebSocketServer) getFallbackClusterMembers() []client.ClusterMember { nodes := wss.nodeDiscovery.GetNodes() @@ -523,6 +606,9 @@ func (wss *WebSocketServer) GetClientCount() int { func (wss *WebSocketServer) Shutdown(ctx context.Context) error { wss.logger.Info("Shutting down WebSocket server") + // Stop periodic cluster info fetching + close(wss.clusterInfoStopCh) + wss.mutex.Lock() clients := make([]*websocket.Conn, 0, len(wss.clients)) for client := range wss.clients {