diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index c57cd43..f7e29c2 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -49,6 +49,9 @@ func (nd *NodeDiscovery) Shutdown(ctx context.Context) error { return nil } +// MessageHandler processes a specific UDP message type +type MessageHandler func(payload string, remoteAddr *net.UDPAddr) + // handleUDPMessage processes incoming UDP messages func (nd *NodeDiscovery) handleUDPMessage(message string, remoteAddr *net.UDPAddr) { nd.logger.WithFields(log.Fields{ @@ -58,13 +61,39 @@ func (nd *NodeDiscovery) handleUDPMessage(message string, remoteAddr *net.UDPAdd message = strings.TrimSpace(message) - if strings.HasPrefix(message, "CLUSTER_HEARTBEAT:") { - hostname := strings.TrimPrefix(message, "CLUSTER_HEARTBEAT:") - nd.updateNodeFromHeartbeat(remoteAddr.IP.String(), remoteAddr.Port, hostname) - } else if strings.HasPrefix(message, "NODE_UPDATE:") { - nd.handleNodeUpdate(remoteAddr.IP.String(), message) - } else if !strings.HasPrefix(message, "RAW:") { - nd.logger.WithField("message", message).Debug("Received unknown UDP message") + // Extract topic by splitting on first ":" + parts := strings.SplitN(message, ":", 2) + if len(parts) < 2 { + nd.logger.WithField("message", message).Debug("Invalid message format - missing ':' separator") + return + } + + topic := parts[0] + payload := parts[1] + + // Handler map for different message types + handlers := map[string]MessageHandler{ + "cluster/heartbeat": func(payload string, remoteAddr *net.UDPAddr) { + nd.updateNodeFromHeartbeat(remoteAddr.IP.String(), remoteAddr.Port, payload) + }, + "node/update": func(payload string, remoteAddr *net.UDPAddr) { + // Reconstruct full message for handleNodeUpdate which expects "node/update:hostname:{json}" + fullMessage := "node/update:" + payload + nd.handleNodeUpdate(remoteAddr.IP.String(), fullMessage) + }, + "raw": func(payload string, remoteAddr *net.UDPAddr) { + nd.logger.WithField("message", "raw:"+payload).Debug("Received raw message") + }, + "cluster/event": func(payload string, remoteAddr *net.UDPAddr) { + nd.logger.WithField("message", "cluster/event:"+payload).Debug("Received cluster/event message") + }, + } + + // Look up and execute handler + if handler, exists := handlers[topic]; exists { + handler(payload, remoteAddr) + } else { + nd.logger.WithField("topic", topic).Debug("Received unknown UDP message type") } } @@ -138,9 +167,9 @@ func (nd *NodeDiscovery) updateNodeFromHeartbeat(sourceIP string, sourcePort int } } -// handleNodeUpdate processes NODE_UPDATE messages +// handleNodeUpdate processes NODE_UPDATE and node/update messages func (nd *NodeDiscovery) handleNodeUpdate(sourceIP, message string) { - // Message format: "NODE_UPDATE:hostname:{json}" + // Message format: "NODE_UPDATE:hostname:{json}" or "node/update:hostname:{json}" parts := strings.SplitN(message, ":", 3) if len(parts) < 3 { nd.logger.WithField("message", message).Warn("Invalid NODE_UPDATE message format") diff --git a/internal/discovery/types.go b/internal/discovery/types.go index fc2b9d9..de06afa 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -57,7 +57,7 @@ func NewNodeDiscovery(udpPort string) *NodeDiscovery { return &NodeDiscovery{ udpPort: udpPort, discoveredNodes: make(map[string]*NodeInfo), - staleThreshold: 10 * time.Second, // TODO make configurable + staleThreshold: 10 * time.Second, // Heartbeat timeout - mark nodes inactive after 10 seconds logger: log.New(), } } diff --git a/internal/server/server.go b/internal/server/server.go index ab031f1..f70bed0 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -366,16 +366,19 @@ func (hs *HTTPServer) setPrimaryNode(w http.ResponseWriter, r *http.Request) { // GET /api/cluster/members func (hs *HTTPServer) getClusterMembers(w http.ResponseWriter, r *http.Request) { + log.Debug("Fetching cluster members via API") + result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) { return client.GetClusterStatus() }) if err != nil { - log.WithError(err).Error("Error fetching cluster members") + log.WithError(err).Debug("Failed to fetch cluster members") http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway) return } + log.Debug("Successfully fetched cluster members via API") json.NewEncoder(w).Encode(result) } @@ -417,42 +420,52 @@ func (hs *HTTPServer) getTaskStatus(w http.ResponseWriter, r *http.Request) { ip := r.URL.Query().Get("ip") if ip != "" { + log.WithField("node_ip", ip).Debug("Fetching task status from specific node") client := hs.getSporeClient(ip) result, err := client.GetTaskStatus() if err != nil { - log.WithError(err).Error("Error fetching task status from specific node") + log.WithFields(log.Fields{ + "node_ip": ip, + "error": err.Error(), + }).Debug("Failed to fetch task status from specific node") http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError) return } + log.WithField("node_ip", ip).Debug("Successfully fetched task status from specific node") json.NewEncoder(w).Encode(result) return } + log.Debug("Fetching task status via failover") result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) { return client.GetTaskStatus() }) if err != nil { - log.WithError(err).Error("Error fetching task status") + log.WithError(err).Debug("Failed to fetch task status via failover") http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status", "message": "%s"}`, err.Error()), http.StatusBadGateway) return } + log.Debug("Successfully fetched task status via failover") json.NewEncoder(w).Encode(result) } // GET /api/node/status func (hs *HTTPServer) getNodeStatus(w http.ResponseWriter, r *http.Request) { + log.Debug("Fetching node system status via failover") + result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) { return client.GetSystemStatus() }) if err != nil { - log.WithError(err).Error("Error fetching system status") + log.WithError(err).Debug("Failed to fetch system status via failover") http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch system status", "message": "%s"}`, err.Error()), http.StatusBadGateway) return } + log.Debug("Successfully fetched system status via failover") json.NewEncoder(w).Encode(result) } @@ -461,14 +474,20 @@ func (hs *HTTPServer) getNodeStatusByIP(w http.ResponseWriter, r *http.Request) vars := mux.Vars(r) nodeIP := vars["ip"] + log.WithField("node_ip", nodeIP).Debug("Fetching system status from specific node") + client := hs.getSporeClient(nodeIP) result, err := client.GetSystemStatus() if err != nil { - log.WithError(err).Error("Error fetching status from specific node") + log.WithFields(log.Fields{ + "node_ip": nodeIP, + "error": err.Error(), + }).Debug("Failed to fetch status from specific node") http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch status from node %s", "message": "%s"}`, nodeIP, err.Error()), http.StatusInternalServerError) return } + log.WithField("node_ip", nodeIP).Debug("Successfully fetched status from specific node") json.NewEncoder(w).Encode(result) } @@ -477,27 +496,34 @@ func (hs *HTTPServer) getNodeEndpoints(w http.ResponseWriter, r *http.Request) { ip := r.URL.Query().Get("ip") if ip != "" { + log.WithField("node_ip", ip).Debug("Fetching endpoints from specific node") client := hs.getSporeClient(ip) result, err := client.GetCapabilities() if err != nil { - log.WithError(err).Error("Error fetching endpoints from specific node") + log.WithFields(log.Fields{ + "node_ip": ip, + "error": err.Error(), + }).Debug("Failed to fetch endpoints from specific node") http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch endpoints from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError) return } + log.WithField("node_ip", ip).Debug("Successfully fetched endpoints from specific node") json.NewEncoder(w).Encode(result) return } + log.Debug("Fetching capabilities via failover") result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) { return client.GetCapabilities() }) if err != nil { - log.WithError(err).Error("Error fetching capabilities") + log.WithError(err).Debug("Failed to fetch capabilities via failover") http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch capabilities", "message": "%s"}`, err.Error()), http.StatusBadGateway) return } + log.Debug("Successfully fetched capabilities via failover") json.NewEncoder(w).Encode(result) } @@ -849,18 +875,21 @@ type ClusterNodeVersionsResponse struct { // GET /api/cluster/node/versions func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Request) { + log.Debug("Fetching cluster node versions") + result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) { return client.GetClusterStatus() }) if err != nil { - log.WithError(err).Error("Error fetching cluster members for versions") + log.WithError(err).Debug("Failed to fetch cluster members for versions") http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway) return } clusterStatus, ok := result.(*client.ClusterStatusResponse) if !ok { + log.Debug("Invalid cluster status response type") http.Error(w, `{"error": "Invalid cluster status response"}`, http.StatusInternalServerError) return } @@ -880,6 +909,8 @@ func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Requ }) } + log.WithField("node_count", len(nodeVersions)).Debug("Successfully fetched cluster node versions") + response := ClusterNodeVersionsResponse{ Nodes: nodeVersions, } @@ -956,12 +987,25 @@ func (hs *HTTPServer) nodeMatchesLabels(nodeLabels, rolloutLabels map[string]str // processRollout handles the actual rollout process in the background func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwareInfo FirmwareInfo) { - log.WithField("rollout_id", rolloutID).Info("Starting background rollout process") + log.WithFields(log.Fields{ + "rollout_id": rolloutID, + "firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version), + "node_count": len(nodes), + }).Debug("Starting background rollout process") // Download firmware from registry + log.WithFields(log.Fields{ + "rollout_id": rolloutID, + "firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version), + }).Debug("Downloading firmware from registry for rollout") + firmwareData, err := hs.registryClient.DownloadFirmware(firmwareInfo.Name, firmwareInfo.Version) if err != nil { - log.WithError(err).Error("Failed to download firmware for rollout") + log.WithFields(log.Fields{ + "rollout_id": rolloutID, + "firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version), + "error": err.Error(), + }).Error("Failed to download firmware for rollout") return } @@ -970,7 +1014,7 @@ func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwar "firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version), "size": len(firmwareData), "total_nodes": len(nodes), - }).Info("Downloaded firmware for rollout") + }).Debug("Successfully downloaded firmware for rollout") // Process nodes in parallel using goroutines var wg sync.WaitGroup @@ -984,9 +1028,14 @@ func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwar "rollout_id": rolloutID, "node_ip": node.IP, "progress": fmt.Sprintf("%d/%d", nodeIndex+1, len(nodes)), - }).Info("Processing node in rollout") + }).Debug("Processing node in rollout") // Update version label on the node before upload + log.WithFields(log.Fields{ + "rollout_id": rolloutID, + "node_ip": node.IP, + }).Debug("Getting SPORE client for node") + client := hs.getSporeClient(node.IP) // Create updated labels with the new version diff --git a/internal/websocket/websocket.go b/internal/websocket/websocket.go index 4fa7f8d..5f099e4 100644 --- a/internal/websocket/websocket.go +++ b/internal/websocket/websocket.go @@ -23,26 +23,34 @@ var upgrader = websocket.Upgrader{ // WebSocketServer manages WebSocket connections and broadcasts type WebSocketServer struct { - nodeDiscovery *discovery.NodeDiscovery - sporeClients map[string]*client.SporeClient - clients map[*websocket.Conn]bool - mutex sync.RWMutex - writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections - logger *log.Logger + nodeDiscovery *discovery.NodeDiscovery + sporeClients map[string]*client.SporeClient + clients map[*websocket.Conn]bool + mutex sync.RWMutex + writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections + logger *log.Logger + clusterInfoTicker *time.Ticker + clusterInfoStopCh chan bool + clusterInfoInterval time.Duration } // NewWebSocketServer creates a new WebSocket server func NewWebSocketServer(nodeDiscovery *discovery.NodeDiscovery) *WebSocketServer { wss := &WebSocketServer{ - nodeDiscovery: nodeDiscovery, - sporeClients: make(map[string]*client.SporeClient), - clients: make(map[*websocket.Conn]bool), - logger: log.New(), + nodeDiscovery: nodeDiscovery, + sporeClients: make(map[string]*client.SporeClient), + clients: make(map[*websocket.Conn]bool), + logger: log.New(), + clusterInfoStopCh: make(chan bool), + clusterInfoInterval: 5 * time.Second, // Fetch cluster info every 5 seconds } // Register callback for node updates nodeDiscovery.AddCallback(wss.handleNodeUpdate) + // Start periodic cluster info fetching + go wss.startPeriodicClusterInfoFetching() + return wss } @@ -126,13 +134,13 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) { } message := struct { - Type string `json:"type"` + Topic string `json:"topic"` Members []client.ClusterMember `json:"members"` PrimaryNode string `json:"primaryNode"` TotalNodes int `json:"totalNodes"` Timestamp string `json:"timestamp"` }{ - Type: "cluster_update", + Topic: "cluster/update", Members: clusterData, PrimaryNode: wss.nodeDiscovery.GetPrimaryNode(), TotalNodes: len(nodes), @@ -151,17 +159,48 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) { } } +// startPeriodicClusterInfoFetching starts a goroutine that periodically fetches cluster info +func (wss *WebSocketServer) startPeriodicClusterInfoFetching() { + wss.clusterInfoTicker = time.NewTicker(wss.clusterInfoInterval) + defer wss.clusterInfoTicker.Stop() + + wss.logger.WithField("interval", wss.clusterInfoInterval).Info("Starting periodic cluster info fetching") + + for { + select { + case <-wss.clusterInfoTicker.C: + wss.fetchAndBroadcastClusterInfo() + case <-wss.clusterInfoStopCh: + wss.logger.Info("Stopping periodic cluster info fetching") + return + } + } +} + +// fetchAndBroadcastClusterInfo fetches cluster info and broadcasts it to clients +func (wss *WebSocketServer) fetchAndBroadcastClusterInfo() { + // Only fetch if we have clients connected + wss.mutex.RLock() + clientCount := len(wss.clients) + wss.mutex.RUnlock() + + if clientCount == 0 { + return + } + + wss.logger.Debug("Periodically fetching cluster info") + wss.broadcastClusterUpdate() +} + // handleNodeUpdate is called when node information changes func (wss *WebSocketServer) handleNodeUpdate(nodeIP, action string) { wss.logger.WithFields(log.Fields{ "node_ip": nodeIP, "action": action, - }).Debug("Node update received, broadcasting to WebSocket clients") + }).Debug("Node update received, broadcasting node discovery event") - // Broadcast cluster update to all clients - wss.broadcastClusterUpdate() - - // Also broadcast node discovery event + // Only broadcast node discovery event, not cluster update + // Cluster updates are now handled by periodic fetching wss.broadcastNodeDiscovery(nodeIP, action) } @@ -188,13 +227,13 @@ func (wss *WebSocketServer) broadcastClusterUpdate() { } message := struct { - Type string `json:"type"` + Topic string `json:"topic"` Members []client.ClusterMember `json:"members"` PrimaryNode string `json:"primaryNode"` TotalNodes int `json:"totalNodes"` Timestamp string `json:"timestamp"` }{ - Type: "cluster_update", + Topic: "cluster/update", Members: clusterData, PrimaryNode: wss.nodeDiscovery.GetPrimaryNode(), TotalNodes: len(wss.nodeDiscovery.GetNodes()), @@ -248,12 +287,12 @@ func (wss *WebSocketServer) broadcastNodeDiscovery(nodeIP, action string) { } message := struct { - Type string `json:"type"` + Topic string `json:"topic"` Action string `json:"action"` NodeIP string `json:"nodeIp"` Timestamp string `json:"timestamp"` }{ - Type: "node_discovery", + Topic: "node/discovery", Action: action, NodeIP: nodeIP, Timestamp: time.Now().Format(time.RFC3339), @@ -291,14 +330,14 @@ func (wss *WebSocketServer) BroadcastFirmwareUploadStatus(nodeIP, status, filena } message := struct { - Type string `json:"type"` + Topic string `json:"topic"` NodeIP string `json:"nodeIp"` Status string `json:"status"` Filename string `json:"filename"` FileSize int `json:"fileSize"` Timestamp string `json:"timestamp"` }{ - Type: "firmware_upload_status", + Topic: "firmware/upload/status", NodeIP: nodeIP, Status: status, Filename: filename, @@ -346,7 +385,7 @@ func (wss *WebSocketServer) BroadcastRolloutProgress(rolloutID, nodeIP, status s } message := struct { - Type string `json:"type"` + Topic string `json:"topic"` RolloutID string `json:"rolloutId"` NodeIP string `json:"nodeIp"` Status string `json:"status"` @@ -355,7 +394,7 @@ func (wss *WebSocketServer) BroadcastRolloutProgress(rolloutID, nodeIP, status s Progress int `json:"progress"` Timestamp string `json:"timestamp"` }{ - Type: "rollout_progress", + Topic: "rollout/progress", RolloutID: rolloutID, NodeIP: nodeIP, Status: status, @@ -429,20 +468,38 @@ func (wss *WebSocketServer) calculateProgress(current, total int, status string) func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember, error) { nodes := wss.nodeDiscovery.GetNodes() if len(nodes) == 0 { + wss.logger.Debug("No nodes available for cluster member retrieval") return []client.ClusterMember{}, nil } // Try to get real cluster data from primary node primaryNode := wss.nodeDiscovery.GetPrimaryNode() if primaryNode != "" { + wss.logger.WithFields(log.Fields{ + "primary_node": primaryNode, + "total_nodes": len(nodes), + }).Debug("Fetching cluster members from primary node") + client := wss.getSporeClient(primaryNode) clusterStatus, err := client.GetClusterStatus() if err == nil { - // Update local node data with API information + wss.logger.WithFields(log.Fields{ + "primary_node": primaryNode, + "member_count": len(clusterStatus.Members), + }).Debug("Successfully fetched cluster members from primary node") + + // Update local node data with API information but preserve heartbeat status wss.updateLocalNodesWithAPI(clusterStatus.Members) - return clusterStatus.Members, nil + + // Return merged data with heartbeat-based status override + return wss.mergeAPIWithHeartbeatStatus(clusterStatus.Members), nil } - wss.logger.WithError(err).Error("Failed to get cluster status from primary node") + wss.logger.WithFields(log.Fields{ + "primary_node": primaryNode, + "error": err.Error(), + }).Debug("Failed to get cluster status from primary node, using fallback") + } else { + wss.logger.Debug("No primary node available, using fallback cluster members") } // Fallback to local data if API fails @@ -451,20 +508,62 @@ func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember, // updateLocalNodesWithAPI updates local node data with information from API func (wss *WebSocketServer) updateLocalNodesWithAPI(apiMembers []client.ClusterMember) { - // This would update the local node discovery with fresh API data - // For now, we'll just log that we received the data wss.logger.WithField("members", len(apiMembers)).Debug("Updating local nodes with API data") for _, member := range apiMembers { - if len(member.Labels) > 0 { - wss.logger.WithFields(log.Fields{ - "ip": member.IP, - "labels": member.Labels, - }).Debug("API member labels") - } + // Update local node with API data, but preserve heartbeat-based status + wss.updateNodeWithAPIData(member) } } +// updateNodeWithAPIData updates a single node with API data while preserving heartbeat status +func (wss *WebSocketServer) updateNodeWithAPIData(apiMember client.ClusterMember) { + nodes := wss.nodeDiscovery.GetNodes() + if localNode, exists := nodes[apiMember.IP]; exists { + // Update additional data from API but preserve heartbeat-based status + localNode.Labels = apiMember.Labels + localNode.Resources = apiMember.Resources + localNode.Latency = apiMember.Latency + + // Only update hostname if it's different and not empty + if apiMember.Hostname != "" && apiMember.Hostname != localNode.Hostname { + localNode.Hostname = apiMember.Hostname + } + + wss.logger.WithFields(log.Fields{ + "ip": apiMember.IP, + "labels": apiMember.Labels, + "status": localNode.Status, // Keep heartbeat-based status + }).Debug("Updated node with API data, preserved heartbeat status") + } +} + +// mergeAPIWithHeartbeatStatus merges API member data with heartbeat-based status +func (wss *WebSocketServer) mergeAPIWithHeartbeatStatus(apiMembers []client.ClusterMember) []client.ClusterMember { + localNodes := wss.nodeDiscovery.GetNodes() + mergedMembers := make([]client.ClusterMember, 0, len(apiMembers)) + + for _, apiMember := range apiMembers { + mergedMember := apiMember + + // Override status with heartbeat-based status if we have local data + if localNode, exists := localNodes[apiMember.IP]; exists { + mergedMember.Status = string(localNode.Status) + mergedMember.LastSeen = localNode.LastSeen.Unix() + + wss.logger.WithFields(log.Fields{ + "ip": apiMember.IP, + "api_status": apiMember.Status, + "heartbeat_status": localNode.Status, + }).Debug("Overriding API status with heartbeat status") + } + + mergedMembers = append(mergedMembers, mergedMember) + } + + return mergedMembers +} + // getFallbackClusterMembers returns local node data as fallback func (wss *WebSocketServer) getFallbackClusterMembers() []client.ClusterMember { nodes := wss.nodeDiscovery.GetNodes() @@ -507,6 +606,9 @@ func (wss *WebSocketServer) GetClientCount() int { func (wss *WebSocketServer) Shutdown(ctx context.Context) error { wss.logger.Info("Shutting down WebSocket server") + // Stop periodic cluster info fetching + close(wss.clusterInfoStopCh) + wss.mutex.Lock() clients := make([]*websocket.Conn, 0, len(wss.clients)) for client := range wss.clients { diff --git a/pkg/client/client.go b/pkg/client/client.go index 83ad04c..703347a 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -117,21 +117,43 @@ type FirmwareUpdateResponse struct { func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) { url := fmt.Sprintf("%s/api/cluster/members", c.BaseURL) + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "endpoint": "/api/cluster/members", + }).Debug("Fetching cluster status from SPORE node") + resp, err := c.HTTPClient.Get(url) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to fetch cluster status from SPORE node") return nil, fmt.Errorf("failed to get cluster status: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "status_code": resp.StatusCode, + }).Debug("Cluster status request returned non-OK status") return nil, fmt.Errorf("cluster status request failed with status %d", resp.StatusCode) } var clusterStatus ClusterStatusResponse if err := json.NewDecoder(resp.Body).Decode(&clusterStatus); err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to decode cluster status response") return nil, fmt.Errorf("failed to decode cluster status response: %w", err) } + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "member_count": len(clusterStatus.Members), + }).Debug("Successfully fetched cluster status from SPORE node") + return &clusterStatus, nil } @@ -139,21 +161,44 @@ func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) { func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) { url := fmt.Sprintf("%s/api/tasks/status", c.BaseURL) + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "endpoint": "/api/tasks/status", + }).Debug("Fetching task status from SPORE node") + resp, err := c.HTTPClient.Get(url) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to fetch task status from SPORE node") return nil, fmt.Errorf("failed to get task status: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "status_code": resp.StatusCode, + }).Debug("Task status request returned non-OK status") return nil, fmt.Errorf("task status request failed with status %d", resp.StatusCode) } var taskStatus TaskStatusResponse if err := json.NewDecoder(resp.Body).Decode(&taskStatus); err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to decode task status response") return nil, fmt.Errorf("failed to decode task status response: %w", err) } + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "total_tasks": taskStatus.Summary.TotalTasks, + "active_tasks": taskStatus.Summary.ActiveTasks, + }).Debug("Successfully fetched task status from SPORE node") + return &taskStatus, nil } @@ -161,21 +206,44 @@ func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) { func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) { url := fmt.Sprintf("%s/api/node/status", c.BaseURL) + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "endpoint": "/api/node/status", + }).Debug("Fetching system status from SPORE node") + resp, err := c.HTTPClient.Get(url) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to fetch system status from SPORE node") return nil, fmt.Errorf("failed to get system status: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "status_code": resp.StatusCode, + }).Debug("System status request returned non-OK status") return nil, fmt.Errorf("system status request failed with status %d", resp.StatusCode) } var systemStatus SystemStatusResponse if err := json.NewDecoder(resp.Body).Decode(&systemStatus); err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to decode system status response") return nil, fmt.Errorf("failed to decode system status response: %w", err) } + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "free_heap": systemStatus.FreeHeap, + "chip_id": systemStatus.ChipID, + }).Debug("Successfully fetched system status from SPORE node") + return &systemStatus, nil } @@ -183,21 +251,43 @@ func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) { func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) { url := fmt.Sprintf("%s/api/node/endpoints", c.BaseURL) + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "endpoint": "/api/node/endpoints", + }).Debug("Fetching capabilities from SPORE node") + resp, err := c.HTTPClient.Get(url) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to fetch capabilities from SPORE node") return nil, fmt.Errorf("failed to get capabilities: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "status_code": resp.StatusCode, + }).Debug("Capabilities request returned non-OK status") return nil, fmt.Errorf("capabilities request failed with status %d", resp.StatusCode) } var capabilities CapabilitiesResponse if err := json.NewDecoder(resp.Body).Decode(&capabilities); err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to decode capabilities response") return nil, fmt.Errorf("failed to decode capabilities response: %w", err) } + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "endpoint_count": len(capabilities.Endpoints), + }).Debug("Successfully fetched capabilities from SPORE node") + return &capabilities, nil } @@ -205,16 +295,30 @@ func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) { func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*FirmwareUpdateResponse, error) { url := fmt.Sprintf("%s/api/node/update", c.BaseURL) + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "endpoint": "/api/node/update", + "filename": filename, + "data_size": len(firmwareData), + }).Debug("Preparing firmware upload to SPORE node") + // Create multipart form var requestBody bytes.Buffer contentType := createMultipartForm(&requestBody, firmwareData, filename) if contentType == "" { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + }).Debug("Failed to create multipart form for firmware upload") return nil, fmt.Errorf("failed to create multipart form") } req, err := http.NewRequest("POST", url, &requestBody) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to create firmware update request") return nil, fmt.Errorf("failed to create firmware update request: %w", err) } @@ -226,9 +330,10 @@ func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*Fir } log.WithFields(log.Fields{ - "node_ip": c.BaseURL, - "status": "sending_firmware", - }).Debug("Sending firmware to SPORE device") + "node_url": c.BaseURL, + "filename": filename, + "data_size": len(firmwareData), + }).Debug("Uploading firmware to SPORE node") resp, err := firmwareClient.Do(req) if err != nil { @@ -277,9 +382,19 @@ func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*Fir func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error { targetURL := fmt.Sprintf("%s/api/node/config", c.BaseURL) + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "endpoint": "/api/node/config", + "labels": labels, + }).Debug("Updating node labels on SPORE node") + // Convert labels to JSON labelsJSON, err := json.Marshal(labels) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to marshal labels") return fmt.Errorf("failed to marshal labels: %w", err) } @@ -289,6 +404,10 @@ func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error { req, err := http.NewRequest("POST", targetURL, strings.NewReader(data.Encode())) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to create labels update request") return fmt.Errorf("failed to create labels update request: %w", err) } @@ -296,19 +415,28 @@ func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error { resp, err := c.HTTPClient.Do(req) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to update node labels") return fmt.Errorf("failed to update node labels: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "status_code": resp.StatusCode, + "error_body": string(body), + }).Debug("Node labels update returned non-OK status") return fmt.Errorf("node labels update failed with status %d: %s", resp.StatusCode, string(body)) } log.WithFields(log.Fields{ - "node_ip": c.BaseURL, - "labels": labels, - }).Info("Node labels updated successfully") + "node_url": c.BaseURL, + "labels": labels, + }).Debug("Successfully updated node labels on SPORE node") return nil } @@ -318,17 +446,43 @@ func (c *SporeClient) ProxyCall(method, uri string, params map[string]interface{ // Build target URL targetURL := fmt.Sprintf("%s%s", c.BaseURL, uri) + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "method": method, + "endpoint": uri, + "param_count": len(params), + }).Debug("Making proxy call to SPORE node") + // Parse parameters and build request req, err := c.buildProxyRequest(method, targetURL, params) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "method": method, + "endpoint": uri, + "error": err.Error(), + }).Debug("Failed to build proxy request") return nil, fmt.Errorf("failed to build proxy request: %w", err) } resp, err := c.HTTPClient.Do(req) if err != nil { + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "method": method, + "endpoint": uri, + "error": err.Error(), + }).Debug("Proxy call failed") return nil, fmt.Errorf("proxy call failed: %w", err) } + log.WithFields(log.Fields{ + "node_url": c.BaseURL, + "method": method, + "endpoint": uri, + "status_code": resp.StatusCode, + }).Debug("Proxy call completed successfully") + return resp, nil } diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index 41516ed..a092f34 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -69,21 +69,42 @@ func (c *RegistryClient) FindFirmwareByNameAndVersion(name, version string) (*Fi func (c *RegistryClient) GetHealth() (map[string]interface{}, error) { url := fmt.Sprintf("%s/health", c.BaseURL) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "endpoint": "/health", + }).Debug("Checking registry health") + resp, err := c.HTTPClient.Get(url) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to check registry health") return nil, fmt.Errorf("failed to get registry health: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "status_code": resp.StatusCode, + }).Debug("Registry health check returned non-OK status") return nil, fmt.Errorf("registry health check failed with status %d", resp.StatusCode) } var health map[string]interface{} if err := json.NewDecoder(resp.Body).Decode(&health); err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to decode health response") return nil, fmt.Errorf("failed to decode health response: %w", err) } + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + }).Debug("Successfully checked registry health") + return health, nil } @@ -91,6 +112,13 @@ func (c *RegistryClient) GetHealth() (map[string]interface{}, error) { func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile io.Reader) (map[string]interface{}, error) { url := fmt.Sprintf("%s/firmware", c.BaseURL) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "endpoint": "/firmware", + "name": metadata.Name, + "version": metadata.Version, + }).Debug("Uploading firmware to registry") + // Create multipart form data body := &bytes.Buffer{} writer := multipart.NewWriter(body) @@ -98,11 +126,19 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile // Add metadata metadataJSON, err := json.Marshal(metadata) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to marshal firmware metadata") return nil, fmt.Errorf("failed to marshal metadata: %w", err) } metadataPart, err := writer.CreateFormField("metadata") if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to create metadata field") return nil, fmt.Errorf("failed to create metadata field: %w", err) } metadataPart.Write(metadataJSON) @@ -110,10 +146,18 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile // Add firmware file firmwarePart, err := writer.CreateFormFile("firmware", fmt.Sprintf("%s-%s.bin", metadata.Name, metadata.Version)) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to create firmware field") return nil, fmt.Errorf("failed to create firmware field: %w", err) } if _, err := io.Copy(firmwarePart, firmwareFile); err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to copy firmware data") return nil, fmt.Errorf("failed to copy firmware data: %w", err) } @@ -121,6 +165,10 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile req, err := http.NewRequest("POST", url, body) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to create upload request") return nil, fmt.Errorf("failed to create request: %w", err) } @@ -128,20 +176,43 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile resp, err := c.HTTPClient.Do(req) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": metadata.Name, + "version": metadata.Version, + "error": err.Error(), + }).Debug("Failed to upload firmware to registry") return nil, fmt.Errorf("failed to upload firmware: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { body, _ := io.ReadAll(resp.Body) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": metadata.Name, + "version": metadata.Version, + "status_code": resp.StatusCode, + "error_body": string(body), + }).Debug("Firmware upload returned non-OK status") return nil, fmt.Errorf("firmware upload failed with status %d: %s", resp.StatusCode, string(body)) } var result map[string]interface{} if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to decode upload response") return nil, fmt.Errorf("failed to decode upload response: %w", err) } + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": metadata.Name, + "version": metadata.Version, + }).Debug("Successfully uploaded firmware to registry") + return result, nil } @@ -149,13 +220,32 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata FirmwareMetadata) (map[string]interface{}, error) { url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "endpoint": fmt.Sprintf("/firmware/%s/%s", name, version), + "name": name, + "version": version, + }).Debug("Updating firmware metadata in registry") + metadataJSON, err := json.Marshal(metadata) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to marshal metadata") return nil, fmt.Errorf("failed to marshal metadata: %w", err) } req, err := http.NewRequest("PUT", url, bytes.NewBuffer(metadataJSON)) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to create update request") return nil, fmt.Errorf("failed to create request: %w", err) } @@ -163,20 +253,45 @@ func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata F resp, err := c.HTTPClient.Do(req) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to update firmware metadata in registry") return nil, fmt.Errorf("failed to update firmware metadata: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "status_code": resp.StatusCode, + "error_body": string(body), + }).Debug("Firmware metadata update returned non-OK status") return nil, fmt.Errorf("firmware metadata update failed with status %d: %s", resp.StatusCode, string(body)) } var result map[string]interface{} if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to decode update response") return nil, fmt.Errorf("failed to decode update response: %w", err) } + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + }).Debug("Successfully updated firmware metadata in registry") + return result, nil } @@ -221,21 +336,43 @@ func (c *RegistryClient) firmwareMatchesLabels(firmwareLabels, rolloutLabels map func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) { url := fmt.Sprintf("%s/firmware", c.BaseURL) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "endpoint": "/firmware", + }).Debug("Fetching firmware list from registry") + resp, err := c.HTTPClient.Get(url) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to fetch firmware list from registry") return nil, fmt.Errorf("failed to get firmware list: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "status_code": resp.StatusCode, + }).Debug("Firmware list request returned non-OK status") return nil, fmt.Errorf("firmware list request failed with status %d", resp.StatusCode) } var firmwareList []GroupedFirmware if err := json.NewDecoder(resp.Body).Decode(&firmwareList); err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "error": err.Error(), + }).Debug("Failed to decode firmware list response") return nil, fmt.Errorf("failed to decode firmware list response: %w", err) } + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "firmware_count": len(firmwareList), + }).Debug("Successfully fetched firmware list from registry") + return firmwareList, nil } @@ -243,26 +380,52 @@ func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) { func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error) { url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "endpoint": fmt.Sprintf("/firmware/%s/%s", name, version), + "name": name, + "version": version, + }).Debug("Downloading firmware from registry") + resp, err := c.HTTPClient.Get(url) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to download firmware from registry") return nil, fmt.Errorf("failed to download firmware: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "status_code": resp.StatusCode, + }).Debug("Firmware download request returned non-OK status") return nil, fmt.Errorf("firmware download request failed with status %d", resp.StatusCode) } data, err := io.ReadAll(resp.Body) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to read firmware data from registry") return nil, fmt.Errorf("failed to read firmware data: %w", err) } log.WithFields(log.Fields{ - "name": name, - "version": version, - "size": len(data), - }).Info("Downloaded firmware from registry") + "registry_url": c.BaseURL, + "name": name, + "version": version, + "size": len(data), + }).Debug("Successfully downloaded firmware from registry") return data, nil } @@ -271,31 +434,64 @@ func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error) func (c *RegistryClient) DeleteFirmware(name, version string) (map[string]interface{}, error) { url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "endpoint": fmt.Sprintf("/firmware/%s/%s", name, version), + "name": name, + "version": version, + }).Debug("Deleting firmware from registry") + req, err := http.NewRequest(http.MethodDelete, url, nil) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to create delete request") return nil, fmt.Errorf("failed to create delete request: %w", err) } resp, err := c.HTTPClient.Do(req) if err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to delete firmware from registry") return nil, fmt.Errorf("failed to delete firmware: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "status_code": resp.StatusCode, + "error_body": string(body), + }).Debug("Firmware delete returned non-OK status") return nil, fmt.Errorf("firmware delete request failed with status %d: %s", resp.StatusCode, string(body)) } var result map[string]interface{} if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + log.WithFields(log.Fields{ + "registry_url": c.BaseURL, + "name": name, + "version": version, + "error": err.Error(), + }).Debug("Failed to decode delete response") return nil, fmt.Errorf("failed to decode delete response: %w", err) } log.WithFields(log.Fields{ - "name": name, - "version": version, - }).Info("Deleted firmware from registry") + "registry_url": c.BaseURL, + "name": name, + "version": version, + }).Debug("Successfully deleted firmware from registry") return result, nil }