feat: periodic polling of cluster status instead of every nodes heartbeat

This commit is contained in:
2025-10-26 11:58:09 +01:00
parent 8aa8b908e6
commit c4b1a2d853
2 changed files with 112 additions and 26 deletions

View File

@@ -57,7 +57,7 @@ func NewNodeDiscovery(udpPort string) *NodeDiscovery {
return &NodeDiscovery{ return &NodeDiscovery{
udpPort: udpPort, udpPort: udpPort,
discoveredNodes: make(map[string]*NodeInfo), discoveredNodes: make(map[string]*NodeInfo),
staleThreshold: 10 * time.Second, // TODO make configurable staleThreshold: 10 * time.Second, // Heartbeat timeout - mark nodes inactive after 10 seconds
logger: log.New(), logger: log.New(),
} }
} }

View File

@@ -29,6 +29,9 @@ type WebSocketServer struct {
mutex sync.RWMutex mutex sync.RWMutex
writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections
logger *log.Logger logger *log.Logger
clusterInfoTicker *time.Ticker
clusterInfoStopCh chan bool
clusterInfoInterval time.Duration
} }
// NewWebSocketServer creates a new WebSocket server // NewWebSocketServer creates a new WebSocket server
@@ -38,11 +41,16 @@ func NewWebSocketServer(nodeDiscovery *discovery.NodeDiscovery) *WebSocketServer
sporeClients: make(map[string]*client.SporeClient), sporeClients: make(map[string]*client.SporeClient),
clients: make(map[*websocket.Conn]bool), clients: make(map[*websocket.Conn]bool),
logger: log.New(), logger: log.New(),
clusterInfoStopCh: make(chan bool),
clusterInfoInterval: 5 * time.Second, // Fetch cluster info every 5 seconds
} }
// Register callback for node updates // Register callback for node updates
nodeDiscovery.AddCallback(wss.handleNodeUpdate) nodeDiscovery.AddCallback(wss.handleNodeUpdate)
// Start periodic cluster info fetching
go wss.startPeriodicClusterInfoFetching()
return wss return wss
} }
@@ -151,17 +159,48 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) {
} }
} }
// startPeriodicClusterInfoFetching starts a goroutine that periodically fetches cluster info
func (wss *WebSocketServer) startPeriodicClusterInfoFetching() {
wss.clusterInfoTicker = time.NewTicker(wss.clusterInfoInterval)
defer wss.clusterInfoTicker.Stop()
wss.logger.WithField("interval", wss.clusterInfoInterval).Info("Starting periodic cluster info fetching")
for {
select {
case <-wss.clusterInfoTicker.C:
wss.fetchAndBroadcastClusterInfo()
case <-wss.clusterInfoStopCh:
wss.logger.Info("Stopping periodic cluster info fetching")
return
}
}
}
// fetchAndBroadcastClusterInfo fetches cluster info and broadcasts it to clients
func (wss *WebSocketServer) fetchAndBroadcastClusterInfo() {
// Only fetch if we have clients connected
wss.mutex.RLock()
clientCount := len(wss.clients)
wss.mutex.RUnlock()
if clientCount == 0 {
return
}
wss.logger.Debug("Periodically fetching cluster info")
wss.broadcastClusterUpdate()
}
// handleNodeUpdate is called when node information changes // handleNodeUpdate is called when node information changes
func (wss *WebSocketServer) handleNodeUpdate(nodeIP, action string) { func (wss *WebSocketServer) handleNodeUpdate(nodeIP, action string) {
wss.logger.WithFields(log.Fields{ wss.logger.WithFields(log.Fields{
"node_ip": nodeIP, "node_ip": nodeIP,
"action": action, "action": action,
}).Debug("Node update received, broadcasting to WebSocket clients") }).Debug("Node update received, broadcasting node discovery event")
// Broadcast cluster update to all clients // Only broadcast node discovery event, not cluster update
wss.broadcastClusterUpdate() // Cluster updates are now handled by periodic fetching
// Also broadcast node discovery event
wss.broadcastNodeDiscovery(nodeIP, action) wss.broadcastNodeDiscovery(nodeIP, action)
} }
@@ -449,9 +488,11 @@ func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember,
"member_count": len(clusterStatus.Members), "member_count": len(clusterStatus.Members),
}).Debug("Successfully fetched cluster members from primary node") }).Debug("Successfully fetched cluster members from primary node")
// Update local node data with API information // Update local node data with API information but preserve heartbeat status
wss.updateLocalNodesWithAPI(clusterStatus.Members) wss.updateLocalNodesWithAPI(clusterStatus.Members)
return clusterStatus.Members, nil
// Return merged data with heartbeat-based status override
return wss.mergeAPIWithHeartbeatStatus(clusterStatus.Members), nil
} }
wss.logger.WithFields(log.Fields{ wss.logger.WithFields(log.Fields{
"primary_node": primaryNode, "primary_node": primaryNode,
@@ -467,18 +508,60 @@ func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember,
// updateLocalNodesWithAPI updates local node data with information from API // updateLocalNodesWithAPI updates local node data with information from API
func (wss *WebSocketServer) updateLocalNodesWithAPI(apiMembers []client.ClusterMember) { func (wss *WebSocketServer) updateLocalNodesWithAPI(apiMembers []client.ClusterMember) {
// This would update the local node discovery with fresh API data
// For now, we'll just log that we received the data
wss.logger.WithField("members", len(apiMembers)).Debug("Updating local nodes with API data") wss.logger.WithField("members", len(apiMembers)).Debug("Updating local nodes with API data")
for _, member := range apiMembers { for _, member := range apiMembers {
if len(member.Labels) > 0 { // Update local node with API data, but preserve heartbeat-based status
wss.updateNodeWithAPIData(member)
}
}
// updateNodeWithAPIData updates a single node with API data while preserving heartbeat status
func (wss *WebSocketServer) updateNodeWithAPIData(apiMember client.ClusterMember) {
nodes := wss.nodeDiscovery.GetNodes()
if localNode, exists := nodes[apiMember.IP]; exists {
// Update additional data from API but preserve heartbeat-based status
localNode.Labels = apiMember.Labels
localNode.Resources = apiMember.Resources
localNode.Latency = apiMember.Latency
// Only update hostname if it's different and not empty
if apiMember.Hostname != "" && apiMember.Hostname != localNode.Hostname {
localNode.Hostname = apiMember.Hostname
}
wss.logger.WithFields(log.Fields{ wss.logger.WithFields(log.Fields{
"ip": member.IP, "ip": apiMember.IP,
"labels": member.Labels, "labels": apiMember.Labels,
}).Debug("API member labels") "status": localNode.Status, // Keep heartbeat-based status
}).Debug("Updated node with API data, preserved heartbeat status")
} }
} }
// mergeAPIWithHeartbeatStatus merges API member data with heartbeat-based status
func (wss *WebSocketServer) mergeAPIWithHeartbeatStatus(apiMembers []client.ClusterMember) []client.ClusterMember {
localNodes := wss.nodeDiscovery.GetNodes()
mergedMembers := make([]client.ClusterMember, 0, len(apiMembers))
for _, apiMember := range apiMembers {
mergedMember := apiMember
// Override status with heartbeat-based status if we have local data
if localNode, exists := localNodes[apiMember.IP]; exists {
mergedMember.Status = string(localNode.Status)
mergedMember.LastSeen = localNode.LastSeen.Unix()
wss.logger.WithFields(log.Fields{
"ip": apiMember.IP,
"api_status": apiMember.Status,
"heartbeat_status": localNode.Status,
}).Debug("Overriding API status with heartbeat status")
}
mergedMembers = append(mergedMembers, mergedMember)
}
return mergedMembers
} }
// getFallbackClusterMembers returns local node data as fallback // getFallbackClusterMembers returns local node data as fallback
@@ -523,6 +606,9 @@ func (wss *WebSocketServer) GetClientCount() int {
func (wss *WebSocketServer) Shutdown(ctx context.Context) error { func (wss *WebSocketServer) Shutdown(ctx context.Context) error {
wss.logger.Info("Shutting down WebSocket server") wss.logger.Info("Shutting down WebSocket server")
// Stop periodic cluster info fetching
close(wss.clusterInfoStopCh)
wss.mutex.Lock() wss.mutex.Lock()
clients := make([]*websocket.Conn, 0, len(wss.clients)) clients := make([]*websocket.Conn, 0, len(wss.clients))
for client := range wss.clients { for client := range wss.clients {