Merge pull request 'feature/status-update-optimization' (#3) from feature/status-update-optimization into main

Reviewed-on: #3
This commit is contained in:
2025-10-26 12:51:55 +01:00
6 changed files with 601 additions and 71 deletions

View File

@@ -49,6 +49,9 @@ func (nd *NodeDiscovery) Shutdown(ctx context.Context) error {
return nil
}
// MessageHandler processes a specific UDP message type
type MessageHandler func(payload string, remoteAddr *net.UDPAddr)
// handleUDPMessage processes incoming UDP messages
func (nd *NodeDiscovery) handleUDPMessage(message string, remoteAddr *net.UDPAddr) {
nd.logger.WithFields(log.Fields{
@@ -58,13 +61,39 @@ func (nd *NodeDiscovery) handleUDPMessage(message string, remoteAddr *net.UDPAdd
message = strings.TrimSpace(message)
if strings.HasPrefix(message, "CLUSTER_HEARTBEAT:") {
hostname := strings.TrimPrefix(message, "CLUSTER_HEARTBEAT:")
nd.updateNodeFromHeartbeat(remoteAddr.IP.String(), remoteAddr.Port, hostname)
} else if strings.HasPrefix(message, "NODE_UPDATE:") {
nd.handleNodeUpdate(remoteAddr.IP.String(), message)
} else if !strings.HasPrefix(message, "RAW:") {
nd.logger.WithField("message", message).Debug("Received unknown UDP message")
// Extract topic by splitting on first ":"
parts := strings.SplitN(message, ":", 2)
if len(parts) < 2 {
nd.logger.WithField("message", message).Debug("Invalid message format - missing ':' separator")
return
}
topic := parts[0]
payload := parts[1]
// Handler map for different message types
handlers := map[string]MessageHandler{
"cluster/heartbeat": func(payload string, remoteAddr *net.UDPAddr) {
nd.updateNodeFromHeartbeat(remoteAddr.IP.String(), remoteAddr.Port, payload)
},
"node/update": func(payload string, remoteAddr *net.UDPAddr) {
// Reconstruct full message for handleNodeUpdate which expects "node/update:hostname:{json}"
fullMessage := "node/update:" + payload
nd.handleNodeUpdate(remoteAddr.IP.String(), fullMessage)
},
"raw": func(payload string, remoteAddr *net.UDPAddr) {
nd.logger.WithField("message", "raw:"+payload).Debug("Received raw message")
},
"cluster/event": func(payload string, remoteAddr *net.UDPAddr) {
nd.logger.WithField("message", "cluster/event:"+payload).Debug("Received cluster/event message")
},
}
// Look up and execute handler
if handler, exists := handlers[topic]; exists {
handler(payload, remoteAddr)
} else {
nd.logger.WithField("topic", topic).Debug("Received unknown UDP message type")
}
}
@@ -138,9 +167,9 @@ func (nd *NodeDiscovery) updateNodeFromHeartbeat(sourceIP string, sourcePort int
}
}
// handleNodeUpdate processes NODE_UPDATE messages
// handleNodeUpdate processes NODE_UPDATE and node/update messages
func (nd *NodeDiscovery) handleNodeUpdate(sourceIP, message string) {
// Message format: "NODE_UPDATE:hostname:{json}"
// Message format: "NODE_UPDATE:hostname:{json}" or "node/update:hostname:{json}"
parts := strings.SplitN(message, ":", 3)
if len(parts) < 3 {
nd.logger.WithField("message", message).Warn("Invalid NODE_UPDATE message format")

View File

@@ -57,7 +57,7 @@ func NewNodeDiscovery(udpPort string) *NodeDiscovery {
return &NodeDiscovery{
udpPort: udpPort,
discoveredNodes: make(map[string]*NodeInfo),
staleThreshold: 10 * time.Second, // TODO make configurable
staleThreshold: 10 * time.Second, // Heartbeat timeout - mark nodes inactive after 10 seconds
logger: log.New(),
}
}

View File

@@ -366,16 +366,19 @@ func (hs *HTTPServer) setPrimaryNode(w http.ResponseWriter, r *http.Request) {
// GET /api/cluster/members
func (hs *HTTPServer) getClusterMembers(w http.ResponseWriter, r *http.Request) {
log.Debug("Fetching cluster members via API")
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
return client.GetClusterStatus()
})
if err != nil {
log.WithError(err).Error("Error fetching cluster members")
log.WithError(err).Debug("Failed to fetch cluster members")
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
return
}
log.Debug("Successfully fetched cluster members via API")
json.NewEncoder(w).Encode(result)
}
@@ -417,42 +420,52 @@ func (hs *HTTPServer) getTaskStatus(w http.ResponseWriter, r *http.Request) {
ip := r.URL.Query().Get("ip")
if ip != "" {
log.WithField("node_ip", ip).Debug("Fetching task status from specific node")
client := hs.getSporeClient(ip)
result, err := client.GetTaskStatus()
if err != nil {
log.WithError(err).Error("Error fetching task status from specific node")
log.WithFields(log.Fields{
"node_ip": ip,
"error": err.Error(),
}).Debug("Failed to fetch task status from specific node")
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
return
}
log.WithField("node_ip", ip).Debug("Successfully fetched task status from specific node")
json.NewEncoder(w).Encode(result)
return
}
log.Debug("Fetching task status via failover")
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
return client.GetTaskStatus()
})
if err != nil {
log.WithError(err).Error("Error fetching task status")
log.WithError(err).Debug("Failed to fetch task status via failover")
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status", "message": "%s"}`, err.Error()), http.StatusBadGateway)
return
}
log.Debug("Successfully fetched task status via failover")
json.NewEncoder(w).Encode(result)
}
// GET /api/node/status
func (hs *HTTPServer) getNodeStatus(w http.ResponseWriter, r *http.Request) {
log.Debug("Fetching node system status via failover")
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
return client.GetSystemStatus()
})
if err != nil {
log.WithError(err).Error("Error fetching system status")
log.WithError(err).Debug("Failed to fetch system status via failover")
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch system status", "message": "%s"}`, err.Error()), http.StatusBadGateway)
return
}
log.Debug("Successfully fetched system status via failover")
json.NewEncoder(w).Encode(result)
}
@@ -461,14 +474,20 @@ func (hs *HTTPServer) getNodeStatusByIP(w http.ResponseWriter, r *http.Request)
vars := mux.Vars(r)
nodeIP := vars["ip"]
log.WithField("node_ip", nodeIP).Debug("Fetching system status from specific node")
client := hs.getSporeClient(nodeIP)
result, err := client.GetSystemStatus()
if err != nil {
log.WithError(err).Error("Error fetching status from specific node")
log.WithFields(log.Fields{
"node_ip": nodeIP,
"error": err.Error(),
}).Debug("Failed to fetch status from specific node")
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch status from node %s", "message": "%s"}`, nodeIP, err.Error()), http.StatusInternalServerError)
return
}
log.WithField("node_ip", nodeIP).Debug("Successfully fetched status from specific node")
json.NewEncoder(w).Encode(result)
}
@@ -477,27 +496,34 @@ func (hs *HTTPServer) getNodeEndpoints(w http.ResponseWriter, r *http.Request) {
ip := r.URL.Query().Get("ip")
if ip != "" {
log.WithField("node_ip", ip).Debug("Fetching endpoints from specific node")
client := hs.getSporeClient(ip)
result, err := client.GetCapabilities()
if err != nil {
log.WithError(err).Error("Error fetching endpoints from specific node")
log.WithFields(log.Fields{
"node_ip": ip,
"error": err.Error(),
}).Debug("Failed to fetch endpoints from specific node")
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch endpoints from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
return
}
log.WithField("node_ip", ip).Debug("Successfully fetched endpoints from specific node")
json.NewEncoder(w).Encode(result)
return
}
log.Debug("Fetching capabilities via failover")
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
return client.GetCapabilities()
})
if err != nil {
log.WithError(err).Error("Error fetching capabilities")
log.WithError(err).Debug("Failed to fetch capabilities via failover")
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch capabilities", "message": "%s"}`, err.Error()), http.StatusBadGateway)
return
}
log.Debug("Successfully fetched capabilities via failover")
json.NewEncoder(w).Encode(result)
}
@@ -849,18 +875,21 @@ type ClusterNodeVersionsResponse struct {
// GET /api/cluster/node/versions
func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Request) {
log.Debug("Fetching cluster node versions")
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
return client.GetClusterStatus()
})
if err != nil {
log.WithError(err).Error("Error fetching cluster members for versions")
log.WithError(err).Debug("Failed to fetch cluster members for versions")
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
return
}
clusterStatus, ok := result.(*client.ClusterStatusResponse)
if !ok {
log.Debug("Invalid cluster status response type")
http.Error(w, `{"error": "Invalid cluster status response"}`, http.StatusInternalServerError)
return
}
@@ -880,6 +909,8 @@ func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Requ
})
}
log.WithField("node_count", len(nodeVersions)).Debug("Successfully fetched cluster node versions")
response := ClusterNodeVersionsResponse{
Nodes: nodeVersions,
}
@@ -956,12 +987,25 @@ func (hs *HTTPServer) nodeMatchesLabels(nodeLabels, rolloutLabels map[string]str
// processRollout handles the actual rollout process in the background
func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwareInfo FirmwareInfo) {
log.WithField("rollout_id", rolloutID).Info("Starting background rollout process")
log.WithFields(log.Fields{
"rollout_id": rolloutID,
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
"node_count": len(nodes),
}).Debug("Starting background rollout process")
// Download firmware from registry
log.WithFields(log.Fields{
"rollout_id": rolloutID,
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
}).Debug("Downloading firmware from registry for rollout")
firmwareData, err := hs.registryClient.DownloadFirmware(firmwareInfo.Name, firmwareInfo.Version)
if err != nil {
log.WithError(err).Error("Failed to download firmware for rollout")
log.WithFields(log.Fields{
"rollout_id": rolloutID,
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
"error": err.Error(),
}).Error("Failed to download firmware for rollout")
return
}
@@ -970,7 +1014,7 @@ func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwar
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
"size": len(firmwareData),
"total_nodes": len(nodes),
}).Info("Downloaded firmware for rollout")
}).Debug("Successfully downloaded firmware for rollout")
// Process nodes in parallel using goroutines
var wg sync.WaitGroup
@@ -984,9 +1028,14 @@ func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwar
"rollout_id": rolloutID,
"node_ip": node.IP,
"progress": fmt.Sprintf("%d/%d", nodeIndex+1, len(nodes)),
}).Info("Processing node in rollout")
}).Debug("Processing node in rollout")
// Update version label on the node before upload
log.WithFields(log.Fields{
"rollout_id": rolloutID,
"node_ip": node.IP,
}).Debug("Getting SPORE client for node")
client := hs.getSporeClient(node.IP)
// Create updated labels with the new version

View File

@@ -23,26 +23,34 @@ var upgrader = websocket.Upgrader{
// WebSocketServer manages WebSocket connections and broadcasts
type WebSocketServer struct {
nodeDiscovery *discovery.NodeDiscovery
sporeClients map[string]*client.SporeClient
clients map[*websocket.Conn]bool
mutex sync.RWMutex
writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections
logger *log.Logger
nodeDiscovery *discovery.NodeDiscovery
sporeClients map[string]*client.SporeClient
clients map[*websocket.Conn]bool
mutex sync.RWMutex
writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections
logger *log.Logger
clusterInfoTicker *time.Ticker
clusterInfoStopCh chan bool
clusterInfoInterval time.Duration
}
// NewWebSocketServer creates a new WebSocket server
func NewWebSocketServer(nodeDiscovery *discovery.NodeDiscovery) *WebSocketServer {
wss := &WebSocketServer{
nodeDiscovery: nodeDiscovery,
sporeClients: make(map[string]*client.SporeClient),
clients: make(map[*websocket.Conn]bool),
logger: log.New(),
nodeDiscovery: nodeDiscovery,
sporeClients: make(map[string]*client.SporeClient),
clients: make(map[*websocket.Conn]bool),
logger: log.New(),
clusterInfoStopCh: make(chan bool),
clusterInfoInterval: 5 * time.Second, // Fetch cluster info every 5 seconds
}
// Register callback for node updates
nodeDiscovery.AddCallback(wss.handleNodeUpdate)
// Start periodic cluster info fetching
go wss.startPeriodicClusterInfoFetching()
return wss
}
@@ -126,13 +134,13 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) {
}
message := struct {
Type string `json:"type"`
Topic string `json:"topic"`
Members []client.ClusterMember `json:"members"`
PrimaryNode string `json:"primaryNode"`
TotalNodes int `json:"totalNodes"`
Timestamp string `json:"timestamp"`
}{
Type: "cluster_update",
Topic: "cluster/update",
Members: clusterData,
PrimaryNode: wss.nodeDiscovery.GetPrimaryNode(),
TotalNodes: len(nodes),
@@ -151,17 +159,48 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) {
}
}
// startPeriodicClusterInfoFetching starts a goroutine that periodically fetches cluster info
func (wss *WebSocketServer) startPeriodicClusterInfoFetching() {
wss.clusterInfoTicker = time.NewTicker(wss.clusterInfoInterval)
defer wss.clusterInfoTicker.Stop()
wss.logger.WithField("interval", wss.clusterInfoInterval).Info("Starting periodic cluster info fetching")
for {
select {
case <-wss.clusterInfoTicker.C:
wss.fetchAndBroadcastClusterInfo()
case <-wss.clusterInfoStopCh:
wss.logger.Info("Stopping periodic cluster info fetching")
return
}
}
}
// fetchAndBroadcastClusterInfo fetches cluster info and broadcasts it to clients
func (wss *WebSocketServer) fetchAndBroadcastClusterInfo() {
// Only fetch if we have clients connected
wss.mutex.RLock()
clientCount := len(wss.clients)
wss.mutex.RUnlock()
if clientCount == 0 {
return
}
wss.logger.Debug("Periodically fetching cluster info")
wss.broadcastClusterUpdate()
}
// handleNodeUpdate is called when node information changes
func (wss *WebSocketServer) handleNodeUpdate(nodeIP, action string) {
wss.logger.WithFields(log.Fields{
"node_ip": nodeIP,
"action": action,
}).Debug("Node update received, broadcasting to WebSocket clients")
}).Debug("Node update received, broadcasting node discovery event")
// Broadcast cluster update to all clients
wss.broadcastClusterUpdate()
// Also broadcast node discovery event
// Only broadcast node discovery event, not cluster update
// Cluster updates are now handled by periodic fetching
wss.broadcastNodeDiscovery(nodeIP, action)
}
@@ -188,13 +227,13 @@ func (wss *WebSocketServer) broadcastClusterUpdate() {
}
message := struct {
Type string `json:"type"`
Topic string `json:"topic"`
Members []client.ClusterMember `json:"members"`
PrimaryNode string `json:"primaryNode"`
TotalNodes int `json:"totalNodes"`
Timestamp string `json:"timestamp"`
}{
Type: "cluster_update",
Topic: "cluster/update",
Members: clusterData,
PrimaryNode: wss.nodeDiscovery.GetPrimaryNode(),
TotalNodes: len(wss.nodeDiscovery.GetNodes()),
@@ -248,12 +287,12 @@ func (wss *WebSocketServer) broadcastNodeDiscovery(nodeIP, action string) {
}
message := struct {
Type string `json:"type"`
Topic string `json:"topic"`
Action string `json:"action"`
NodeIP string `json:"nodeIp"`
Timestamp string `json:"timestamp"`
}{
Type: "node_discovery",
Topic: "node/discovery",
Action: action,
NodeIP: nodeIP,
Timestamp: time.Now().Format(time.RFC3339),
@@ -291,14 +330,14 @@ func (wss *WebSocketServer) BroadcastFirmwareUploadStatus(nodeIP, status, filena
}
message := struct {
Type string `json:"type"`
Topic string `json:"topic"`
NodeIP string `json:"nodeIp"`
Status string `json:"status"`
Filename string `json:"filename"`
FileSize int `json:"fileSize"`
Timestamp string `json:"timestamp"`
}{
Type: "firmware_upload_status",
Topic: "firmware/upload/status",
NodeIP: nodeIP,
Status: status,
Filename: filename,
@@ -346,7 +385,7 @@ func (wss *WebSocketServer) BroadcastRolloutProgress(rolloutID, nodeIP, status s
}
message := struct {
Type string `json:"type"`
Topic string `json:"topic"`
RolloutID string `json:"rolloutId"`
NodeIP string `json:"nodeIp"`
Status string `json:"status"`
@@ -355,7 +394,7 @@ func (wss *WebSocketServer) BroadcastRolloutProgress(rolloutID, nodeIP, status s
Progress int `json:"progress"`
Timestamp string `json:"timestamp"`
}{
Type: "rollout_progress",
Topic: "rollout/progress",
RolloutID: rolloutID,
NodeIP: nodeIP,
Status: status,
@@ -429,20 +468,38 @@ func (wss *WebSocketServer) calculateProgress(current, total int, status string)
func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember, error) {
nodes := wss.nodeDiscovery.GetNodes()
if len(nodes) == 0 {
wss.logger.Debug("No nodes available for cluster member retrieval")
return []client.ClusterMember{}, nil
}
// Try to get real cluster data from primary node
primaryNode := wss.nodeDiscovery.GetPrimaryNode()
if primaryNode != "" {
wss.logger.WithFields(log.Fields{
"primary_node": primaryNode,
"total_nodes": len(nodes),
}).Debug("Fetching cluster members from primary node")
client := wss.getSporeClient(primaryNode)
clusterStatus, err := client.GetClusterStatus()
if err == nil {
// Update local node data with API information
wss.logger.WithFields(log.Fields{
"primary_node": primaryNode,
"member_count": len(clusterStatus.Members),
}).Debug("Successfully fetched cluster members from primary node")
// Update local node data with API information but preserve heartbeat status
wss.updateLocalNodesWithAPI(clusterStatus.Members)
return clusterStatus.Members, nil
// Return merged data with heartbeat-based status override
return wss.mergeAPIWithHeartbeatStatus(clusterStatus.Members), nil
}
wss.logger.WithError(err).Error("Failed to get cluster status from primary node")
wss.logger.WithFields(log.Fields{
"primary_node": primaryNode,
"error": err.Error(),
}).Debug("Failed to get cluster status from primary node, using fallback")
} else {
wss.logger.Debug("No primary node available, using fallback cluster members")
}
// Fallback to local data if API fails
@@ -451,20 +508,62 @@ func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember,
// updateLocalNodesWithAPI updates local node data with information from API
func (wss *WebSocketServer) updateLocalNodesWithAPI(apiMembers []client.ClusterMember) {
// This would update the local node discovery with fresh API data
// For now, we'll just log that we received the data
wss.logger.WithField("members", len(apiMembers)).Debug("Updating local nodes with API data")
for _, member := range apiMembers {
if len(member.Labels) > 0 {
wss.logger.WithFields(log.Fields{
"ip": member.IP,
"labels": member.Labels,
}).Debug("API member labels")
}
// Update local node with API data, but preserve heartbeat-based status
wss.updateNodeWithAPIData(member)
}
}
// updateNodeWithAPIData updates a single node with API data while preserving heartbeat status
func (wss *WebSocketServer) updateNodeWithAPIData(apiMember client.ClusterMember) {
nodes := wss.nodeDiscovery.GetNodes()
if localNode, exists := nodes[apiMember.IP]; exists {
// Update additional data from API but preserve heartbeat-based status
localNode.Labels = apiMember.Labels
localNode.Resources = apiMember.Resources
localNode.Latency = apiMember.Latency
// Only update hostname if it's different and not empty
if apiMember.Hostname != "" && apiMember.Hostname != localNode.Hostname {
localNode.Hostname = apiMember.Hostname
}
wss.logger.WithFields(log.Fields{
"ip": apiMember.IP,
"labels": apiMember.Labels,
"status": localNode.Status, // Keep heartbeat-based status
}).Debug("Updated node with API data, preserved heartbeat status")
}
}
// mergeAPIWithHeartbeatStatus merges API member data with heartbeat-based status
func (wss *WebSocketServer) mergeAPIWithHeartbeatStatus(apiMembers []client.ClusterMember) []client.ClusterMember {
localNodes := wss.nodeDiscovery.GetNodes()
mergedMembers := make([]client.ClusterMember, 0, len(apiMembers))
for _, apiMember := range apiMembers {
mergedMember := apiMember
// Override status with heartbeat-based status if we have local data
if localNode, exists := localNodes[apiMember.IP]; exists {
mergedMember.Status = string(localNode.Status)
mergedMember.LastSeen = localNode.LastSeen.Unix()
wss.logger.WithFields(log.Fields{
"ip": apiMember.IP,
"api_status": apiMember.Status,
"heartbeat_status": localNode.Status,
}).Debug("Overriding API status with heartbeat status")
}
mergedMembers = append(mergedMembers, mergedMember)
}
return mergedMembers
}
// getFallbackClusterMembers returns local node data as fallback
func (wss *WebSocketServer) getFallbackClusterMembers() []client.ClusterMember {
nodes := wss.nodeDiscovery.GetNodes()
@@ -507,6 +606,9 @@ func (wss *WebSocketServer) GetClientCount() int {
func (wss *WebSocketServer) Shutdown(ctx context.Context) error {
wss.logger.Info("Shutting down WebSocket server")
// Stop periodic cluster info fetching
close(wss.clusterInfoStopCh)
wss.mutex.Lock()
clients := make([]*websocket.Conn, 0, len(wss.clients))
for client := range wss.clients {

View File

@@ -117,21 +117,43 @@ type FirmwareUpdateResponse struct {
func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) {
url := fmt.Sprintf("%s/api/cluster/members", c.BaseURL)
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"endpoint": "/api/cluster/members",
}).Debug("Fetching cluster status from SPORE node")
resp, err := c.HTTPClient.Get(url)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to fetch cluster status from SPORE node")
return nil, fmt.Errorf("failed to get cluster status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"status_code": resp.StatusCode,
}).Debug("Cluster status request returned non-OK status")
return nil, fmt.Errorf("cluster status request failed with status %d", resp.StatusCode)
}
var clusterStatus ClusterStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&clusterStatus); err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to decode cluster status response")
return nil, fmt.Errorf("failed to decode cluster status response: %w", err)
}
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"member_count": len(clusterStatus.Members),
}).Debug("Successfully fetched cluster status from SPORE node")
return &clusterStatus, nil
}
@@ -139,21 +161,44 @@ func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) {
func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) {
url := fmt.Sprintf("%s/api/tasks/status", c.BaseURL)
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"endpoint": "/api/tasks/status",
}).Debug("Fetching task status from SPORE node")
resp, err := c.HTTPClient.Get(url)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to fetch task status from SPORE node")
return nil, fmt.Errorf("failed to get task status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"status_code": resp.StatusCode,
}).Debug("Task status request returned non-OK status")
return nil, fmt.Errorf("task status request failed with status %d", resp.StatusCode)
}
var taskStatus TaskStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&taskStatus); err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to decode task status response")
return nil, fmt.Errorf("failed to decode task status response: %w", err)
}
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"total_tasks": taskStatus.Summary.TotalTasks,
"active_tasks": taskStatus.Summary.ActiveTasks,
}).Debug("Successfully fetched task status from SPORE node")
return &taskStatus, nil
}
@@ -161,21 +206,44 @@ func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) {
func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) {
url := fmt.Sprintf("%s/api/node/status", c.BaseURL)
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"endpoint": "/api/node/status",
}).Debug("Fetching system status from SPORE node")
resp, err := c.HTTPClient.Get(url)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to fetch system status from SPORE node")
return nil, fmt.Errorf("failed to get system status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"status_code": resp.StatusCode,
}).Debug("System status request returned non-OK status")
return nil, fmt.Errorf("system status request failed with status %d", resp.StatusCode)
}
var systemStatus SystemStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&systemStatus); err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to decode system status response")
return nil, fmt.Errorf("failed to decode system status response: %w", err)
}
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"free_heap": systemStatus.FreeHeap,
"chip_id": systemStatus.ChipID,
}).Debug("Successfully fetched system status from SPORE node")
return &systemStatus, nil
}
@@ -183,21 +251,43 @@ func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) {
func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) {
url := fmt.Sprintf("%s/api/node/endpoints", c.BaseURL)
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"endpoint": "/api/node/endpoints",
}).Debug("Fetching capabilities from SPORE node")
resp, err := c.HTTPClient.Get(url)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to fetch capabilities from SPORE node")
return nil, fmt.Errorf("failed to get capabilities: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"status_code": resp.StatusCode,
}).Debug("Capabilities request returned non-OK status")
return nil, fmt.Errorf("capabilities request failed with status %d", resp.StatusCode)
}
var capabilities CapabilitiesResponse
if err := json.NewDecoder(resp.Body).Decode(&capabilities); err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to decode capabilities response")
return nil, fmt.Errorf("failed to decode capabilities response: %w", err)
}
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"endpoint_count": len(capabilities.Endpoints),
}).Debug("Successfully fetched capabilities from SPORE node")
return &capabilities, nil
}
@@ -205,16 +295,30 @@ func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) {
func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*FirmwareUpdateResponse, error) {
url := fmt.Sprintf("%s/api/node/update", c.BaseURL)
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"endpoint": "/api/node/update",
"filename": filename,
"data_size": len(firmwareData),
}).Debug("Preparing firmware upload to SPORE node")
// Create multipart form
var requestBody bytes.Buffer
contentType := createMultipartForm(&requestBody, firmwareData, filename)
if contentType == "" {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
}).Debug("Failed to create multipart form for firmware upload")
return nil, fmt.Errorf("failed to create multipart form")
}
req, err := http.NewRequest("POST", url, &requestBody)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to create firmware update request")
return nil, fmt.Errorf("failed to create firmware update request: %w", err)
}
@@ -226,9 +330,10 @@ func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*Fir
}
log.WithFields(log.Fields{
"node_ip": c.BaseURL,
"status": "sending_firmware",
}).Debug("Sending firmware to SPORE device")
"node_url": c.BaseURL,
"filename": filename,
"data_size": len(firmwareData),
}).Debug("Uploading firmware to SPORE node")
resp, err := firmwareClient.Do(req)
if err != nil {
@@ -277,9 +382,19 @@ func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*Fir
func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
targetURL := fmt.Sprintf("%s/api/node/config", c.BaseURL)
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"endpoint": "/api/node/config",
"labels": labels,
}).Debug("Updating node labels on SPORE node")
// Convert labels to JSON
labelsJSON, err := json.Marshal(labels)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to marshal labels")
return fmt.Errorf("failed to marshal labels: %w", err)
}
@@ -289,6 +404,10 @@ func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
req, err := http.NewRequest("POST", targetURL, strings.NewReader(data.Encode()))
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to create labels update request")
return fmt.Errorf("failed to create labels update request: %w", err)
}
@@ -296,19 +415,28 @@ func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
resp, err := c.HTTPClient.Do(req)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to update node labels")
return fmt.Errorf("failed to update node labels: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"status_code": resp.StatusCode,
"error_body": string(body),
}).Debug("Node labels update returned non-OK status")
return fmt.Errorf("node labels update failed with status %d: %s", resp.StatusCode, string(body))
}
log.WithFields(log.Fields{
"node_ip": c.BaseURL,
"labels": labels,
}).Info("Node labels updated successfully")
"node_url": c.BaseURL,
"labels": labels,
}).Debug("Successfully updated node labels on SPORE node")
return nil
}
@@ -318,17 +446,43 @@ func (c *SporeClient) ProxyCall(method, uri string, params map[string]interface{
// Build target URL
targetURL := fmt.Sprintf("%s%s", c.BaseURL, uri)
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"method": method,
"endpoint": uri,
"param_count": len(params),
}).Debug("Making proxy call to SPORE node")
// Parse parameters and build request
req, err := c.buildProxyRequest(method, targetURL, params)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"method": method,
"endpoint": uri,
"error": err.Error(),
}).Debug("Failed to build proxy request")
return nil, fmt.Errorf("failed to build proxy request: %w", err)
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"method": method,
"endpoint": uri,
"error": err.Error(),
}).Debug("Proxy call failed")
return nil, fmt.Errorf("proxy call failed: %w", err)
}
log.WithFields(log.Fields{
"node_url": c.BaseURL,
"method": method,
"endpoint": uri,
"status_code": resp.StatusCode,
}).Debug("Proxy call completed successfully")
return resp, nil
}

View File

@@ -69,21 +69,42 @@ func (c *RegistryClient) FindFirmwareByNameAndVersion(name, version string) (*Fi
func (c *RegistryClient) GetHealth() (map[string]interface{}, error) {
url := fmt.Sprintf("%s/health", c.BaseURL)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"endpoint": "/health",
}).Debug("Checking registry health")
resp, err := c.HTTPClient.Get(url)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to check registry health")
return nil, fmt.Errorf("failed to get registry health: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"status_code": resp.StatusCode,
}).Debug("Registry health check returned non-OK status")
return nil, fmt.Errorf("registry health check failed with status %d", resp.StatusCode)
}
var health map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to decode health response")
return nil, fmt.Errorf("failed to decode health response: %w", err)
}
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
}).Debug("Successfully checked registry health")
return health, nil
}
@@ -91,6 +112,13 @@ func (c *RegistryClient) GetHealth() (map[string]interface{}, error) {
func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile io.Reader) (map[string]interface{}, error) {
url := fmt.Sprintf("%s/firmware", c.BaseURL)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"endpoint": "/firmware",
"name": metadata.Name,
"version": metadata.Version,
}).Debug("Uploading firmware to registry")
// Create multipart form data
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
@@ -98,11 +126,19 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
// Add metadata
metadataJSON, err := json.Marshal(metadata)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to marshal firmware metadata")
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
}
metadataPart, err := writer.CreateFormField("metadata")
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to create metadata field")
return nil, fmt.Errorf("failed to create metadata field: %w", err)
}
metadataPart.Write(metadataJSON)
@@ -110,10 +146,18 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
// Add firmware file
firmwarePart, err := writer.CreateFormFile("firmware", fmt.Sprintf("%s-%s.bin", metadata.Name, metadata.Version))
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to create firmware field")
return nil, fmt.Errorf("failed to create firmware field: %w", err)
}
if _, err := io.Copy(firmwarePart, firmwareFile); err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to copy firmware data")
return nil, fmt.Errorf("failed to copy firmware data: %w", err)
}
@@ -121,6 +165,10 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
req, err := http.NewRequest("POST", url, body)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to create upload request")
return nil, fmt.Errorf("failed to create request: %w", err)
}
@@ -128,20 +176,43 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
resp, err := c.HTTPClient.Do(req)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": metadata.Name,
"version": metadata.Version,
"error": err.Error(),
}).Debug("Failed to upload firmware to registry")
return nil, fmt.Errorf("failed to upload firmware: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": metadata.Name,
"version": metadata.Version,
"status_code": resp.StatusCode,
"error_body": string(body),
}).Debug("Firmware upload returned non-OK status")
return nil, fmt.Errorf("firmware upload failed with status %d: %s", resp.StatusCode, string(body))
}
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to decode upload response")
return nil, fmt.Errorf("failed to decode upload response: %w", err)
}
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": metadata.Name,
"version": metadata.Version,
}).Debug("Successfully uploaded firmware to registry")
return result, nil
}
@@ -149,13 +220,32 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata FirmwareMetadata) (map[string]interface{}, error) {
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
"name": name,
"version": version,
}).Debug("Updating firmware metadata in registry")
metadataJSON, err := json.Marshal(metadata)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to marshal metadata")
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
}
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(metadataJSON))
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to create update request")
return nil, fmt.Errorf("failed to create request: %w", err)
}
@@ -163,20 +253,45 @@ func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata F
resp, err := c.HTTPClient.Do(req)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to update firmware metadata in registry")
return nil, fmt.Errorf("failed to update firmware metadata: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"status_code": resp.StatusCode,
"error_body": string(body),
}).Debug("Firmware metadata update returned non-OK status")
return nil, fmt.Errorf("firmware metadata update failed with status %d: %s", resp.StatusCode, string(body))
}
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to decode update response")
return nil, fmt.Errorf("failed to decode update response: %w", err)
}
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
}).Debug("Successfully updated firmware metadata in registry")
return result, nil
}
@@ -221,21 +336,43 @@ func (c *RegistryClient) firmwareMatchesLabels(firmwareLabels, rolloutLabels map
func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) {
url := fmt.Sprintf("%s/firmware", c.BaseURL)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"endpoint": "/firmware",
}).Debug("Fetching firmware list from registry")
resp, err := c.HTTPClient.Get(url)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to fetch firmware list from registry")
return nil, fmt.Errorf("failed to get firmware list: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"status_code": resp.StatusCode,
}).Debug("Firmware list request returned non-OK status")
return nil, fmt.Errorf("firmware list request failed with status %d", resp.StatusCode)
}
var firmwareList []GroupedFirmware
if err := json.NewDecoder(resp.Body).Decode(&firmwareList); err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"error": err.Error(),
}).Debug("Failed to decode firmware list response")
return nil, fmt.Errorf("failed to decode firmware list response: %w", err)
}
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"firmware_count": len(firmwareList),
}).Debug("Successfully fetched firmware list from registry")
return firmwareList, nil
}
@@ -243,26 +380,52 @@ func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) {
func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error) {
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
"name": name,
"version": version,
}).Debug("Downloading firmware from registry")
resp, err := c.HTTPClient.Get(url)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to download firmware from registry")
return nil, fmt.Errorf("failed to download firmware: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"status_code": resp.StatusCode,
}).Debug("Firmware download request returned non-OK status")
return nil, fmt.Errorf("firmware download request failed with status %d", resp.StatusCode)
}
data, err := io.ReadAll(resp.Body)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to read firmware data from registry")
return nil, fmt.Errorf("failed to read firmware data: %w", err)
}
log.WithFields(log.Fields{
"name": name,
"version": version,
"size": len(data),
}).Info("Downloaded firmware from registry")
"registry_url": c.BaseURL,
"name": name,
"version": version,
"size": len(data),
}).Debug("Successfully downloaded firmware from registry")
return data, nil
}
@@ -271,31 +434,64 @@ func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error)
func (c *RegistryClient) DeleteFirmware(name, version string) (map[string]interface{}, error) {
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
"name": name,
"version": version,
}).Debug("Deleting firmware from registry")
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to create delete request")
return nil, fmt.Errorf("failed to create delete request: %w", err)
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to delete firmware from registry")
return nil, fmt.Errorf("failed to delete firmware: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"status_code": resp.StatusCode,
"error_body": string(body),
}).Debug("Firmware delete returned non-OK status")
return nil, fmt.Errorf("firmware delete request failed with status %d: %s", resp.StatusCode, string(body))
}
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
log.WithFields(log.Fields{
"registry_url": c.BaseURL,
"name": name,
"version": version,
"error": err.Error(),
}).Debug("Failed to decode delete response")
return nil, fmt.Errorf("failed to decode delete response: %w", err)
}
log.WithFields(log.Fields{
"name": name,
"version": version,
}).Info("Deleted firmware from registry")
"registry_url": c.BaseURL,
"name": name,
"version": version,
}).Debug("Successfully deleted firmware from registry")
return result, nil
}