From 55c3aebb3f290f944142b63f5ec1328bb891cb09 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Sun, 26 Oct 2025 18:09:42 +0100 Subject: [PATCH] feat: handle cluster events --- internal/discovery/discovery.go | 55 ++++++++++++++++++++++++++++++++- internal/discovery/types.go | 20 +++++++----- internal/server/server.go | 3 ++ internal/websocket/websocket.go | 46 +++++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 8 deletions(-) diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index f7e29c2..60ddc14 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "encoding/json" "fmt" "net" "strconv" @@ -85,7 +86,10 @@ func (nd *NodeDiscovery) handleUDPMessage(message string, remoteAddr *net.UDPAdd nd.logger.WithField("message", "raw:"+payload).Debug("Received raw message") }, "cluster/event": func(payload string, remoteAddr *net.UDPAddr) { - nd.logger.WithField("message", "cluster/event:"+payload).Debug("Received cluster/event message") + nd.handleClusterEvent(payload, remoteAddr) + }, + "cluster/broadcast": func(payload string, remoteAddr *net.UDPAddr) { + nd.handleClusterBroadcast(payload, remoteAddr) }, } @@ -373,6 +377,55 @@ func (nd *NodeDiscovery) AddCallback(callback NodeUpdateCallback) { nd.callbacks = append(nd.callbacks, callback) } +// SetClusterEventCallback sets the callback for cluster events +func (nd *NodeDiscovery) SetClusterEventCallback(callback ClusterEventBroadcaster) { + nd.mutex.Lock() + defer nd.mutex.Unlock() + nd.clusterEventCallback = callback +} + +// handleClusterEvent processes cluster/event messages +func (nd *NodeDiscovery) handleClusterEvent(payload string, remoteAddr *net.UDPAddr) { + nd.logger.WithFields(log.Fields{ + "payload": payload, + "from": remoteAddr.String(), + }).Debug("Received cluster/event message") + + // Forward to websocket if callback is set + if nd.clusterEventCallback != nil { + nd.clusterEventCallback.BroadcastClusterEvent("cluster/event", payload) + } +} + +// handleClusterBroadcast processes cluster/broadcast messages +func (nd *NodeDiscovery) handleClusterBroadcast(payload string, remoteAddr *net.UDPAddr) { + nd.logger.WithFields(log.Fields{ + "payload": payload, + "from": remoteAddr.String(), + }).Debug("Received cluster/broadcast message") + + // Parse the payload JSON to extract nested event and data + var payloadData struct { + Event string `json:"event"` + Data interface{} `json:"data"` + } + + if err := json.Unmarshal([]byte(payload), &payloadData); err != nil { + nd.logger.WithError(err).Error("Failed to parse cluster/broadcast payload") + return + } + + nd.logger.WithFields(log.Fields{ + "event": payloadData.Event, + "from": remoteAddr.String(), + }).Debug("Parsed cluster/broadcast payload") + + // Forward to websocket if callback is set, mapping event to topic and data to data + if nd.clusterEventCallback != nil { + nd.clusterEventCallback.BroadcastClusterEvent(payloadData.Event, payloadData.Data) + } +} + // GetClusterStatus returns current cluster status func (nd *NodeDiscovery) GetClusterStatus() ClusterStatus { nd.mutex.RLock() diff --git a/internal/discovery/types.go b/internal/discovery/types.go index de06afa..9cdff5d 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -41,15 +41,21 @@ type ClusterStatus struct { // NodeUpdateCallback is called when node information changes type NodeUpdateCallback func(nodeIP string, action string) +// ClusterEventBroadcaster interface for broadcasting cluster events +type ClusterEventBroadcaster interface { + BroadcastClusterEvent(topic string, data interface{}) +} + // NodeDiscovery manages UDP-based node discovery type NodeDiscovery struct { - udpPort string - discoveredNodes map[string]*NodeInfo - primaryNode string - mutex sync.RWMutex - callbacks []NodeUpdateCallback - staleThreshold time.Duration - logger *log.Logger + udpPort string + discoveredNodes map[string]*NodeInfo + primaryNode string + mutex sync.RWMutex + callbacks []NodeUpdateCallback + clusterEventCallback ClusterEventBroadcaster + staleThreshold time.Duration + logger *log.Logger } // NewNodeDiscovery creates a new node discovery instance diff --git a/internal/server/server.go b/internal/server/server.go index f70bed0..a34420c 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -38,6 +38,9 @@ func NewHTTPServer(port string, nodeDiscovery *discovery.NodeDiscovery) *HTTPSer // Initialize registry client registryClient := registry.NewRegistryClient("http://localhost:3002") + // Register WebSocket server as cluster event broadcaster + nodeDiscovery.SetClusterEventCallback(wsServer) + hs := &HTTPServer{ port: port, router: mux.NewRouter(), diff --git a/internal/websocket/websocket.go b/internal/websocket/websocket.go index 5f099e4..020fba7 100644 --- a/internal/websocket/websocket.go +++ b/internal/websocket/websocket.go @@ -602,6 +602,52 @@ func (wss *WebSocketServer) GetClientCount() int { return len(wss.clients) } +// BroadcastClusterEvent sends cluster events to all connected clients +func (wss *WebSocketServer) BroadcastClusterEvent(topic string, data interface{}) { + wss.mutex.RLock() + clients := make([]*websocket.Conn, 0, len(wss.clients)) + for client := range wss.clients { + clients = append(clients, client) + } + wss.mutex.RUnlock() + + if len(clients) == 0 { + return + } + + message := struct { + Topic string `json:"topic"` + Data interface{} `json:"data"` + Timestamp string `json:"timestamp"` + }{ + Topic: topic, + Data: data, + Timestamp: time.Now().Format(time.RFC3339), + } + + messageData, err := json.Marshal(message) + if err != nil { + wss.logger.WithError(err).Error("Failed to marshal cluster event") + return + } + + wss.logger.WithFields(log.Fields{ + "topic": topic, + "clients": len(clients), + }).Debug("Broadcasting cluster event to WebSocket clients") + + // Send to all clients with write synchronization + wss.writeMutex.Lock() + defer wss.writeMutex.Unlock() + + for _, client := range clients { + client.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := client.WriteMessage(websocket.TextMessage, messageData); err != nil { + wss.logger.WithError(err).Error("Failed to send cluster event to client") + } + } +} + // Shutdown gracefully shuts down the WebSocket server func (wss *WebSocketServer) Shutdown(ctx context.Context) error { wss.logger.Info("Shutting down WebSocket server")