Compare commits
5 Commits
feature/mo
...
55c3aebb3f
| Author | SHA1 | Date | |
|---|---|---|---|
| 55c3aebb3f | |||
| f7b694854d | |||
| ccadbe3b83 | |||
| c4b1a2d853 | |||
| 8aa8b908e6 |
@@ -2,6 +2,7 @@ package discovery
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -49,6 +50,9 @@ func (nd *NodeDiscovery) Shutdown(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MessageHandler processes a specific UDP message type
|
||||||
|
type MessageHandler func(payload string, remoteAddr *net.UDPAddr)
|
||||||
|
|
||||||
// handleUDPMessage processes incoming UDP messages
|
// handleUDPMessage processes incoming UDP messages
|
||||||
func (nd *NodeDiscovery) handleUDPMessage(message string, remoteAddr *net.UDPAddr) {
|
func (nd *NodeDiscovery) handleUDPMessage(message string, remoteAddr *net.UDPAddr) {
|
||||||
nd.logger.WithFields(log.Fields{
|
nd.logger.WithFields(log.Fields{
|
||||||
@@ -58,13 +62,42 @@ func (nd *NodeDiscovery) handleUDPMessage(message string, remoteAddr *net.UDPAdd
|
|||||||
|
|
||||||
message = strings.TrimSpace(message)
|
message = strings.TrimSpace(message)
|
||||||
|
|
||||||
if strings.HasPrefix(message, "CLUSTER_HEARTBEAT:") {
|
// Extract topic by splitting on first ":"
|
||||||
hostname := strings.TrimPrefix(message, "CLUSTER_HEARTBEAT:")
|
parts := strings.SplitN(message, ":", 2)
|
||||||
nd.updateNodeFromHeartbeat(remoteAddr.IP.String(), remoteAddr.Port, hostname)
|
if len(parts) < 2 {
|
||||||
} else if strings.HasPrefix(message, "NODE_UPDATE:") {
|
nd.logger.WithField("message", message).Debug("Invalid message format - missing ':' separator")
|
||||||
nd.handleNodeUpdate(remoteAddr.IP.String(), message)
|
return
|
||||||
} else if !strings.HasPrefix(message, "RAW:") {
|
}
|
||||||
nd.logger.WithField("message", message).Debug("Received unknown UDP message")
|
|
||||||
|
topic := parts[0]
|
||||||
|
payload := parts[1]
|
||||||
|
|
||||||
|
// Handler map for different message types
|
||||||
|
handlers := map[string]MessageHandler{
|
||||||
|
"cluster/heartbeat": func(payload string, remoteAddr *net.UDPAddr) {
|
||||||
|
nd.updateNodeFromHeartbeat(remoteAddr.IP.String(), remoteAddr.Port, payload)
|
||||||
|
},
|
||||||
|
"node/update": func(payload string, remoteAddr *net.UDPAddr) {
|
||||||
|
// Reconstruct full message for handleNodeUpdate which expects "node/update:hostname:{json}"
|
||||||
|
fullMessage := "node/update:" + payload
|
||||||
|
nd.handleNodeUpdate(remoteAddr.IP.String(), fullMessage)
|
||||||
|
},
|
||||||
|
"raw": func(payload string, remoteAddr *net.UDPAddr) {
|
||||||
|
nd.logger.WithField("message", "raw:"+payload).Debug("Received raw message")
|
||||||
|
},
|
||||||
|
"cluster/event": func(payload string, remoteAddr *net.UDPAddr) {
|
||||||
|
nd.handleClusterEvent(payload, remoteAddr)
|
||||||
|
},
|
||||||
|
"cluster/broadcast": func(payload string, remoteAddr *net.UDPAddr) {
|
||||||
|
nd.handleClusterBroadcast(payload, remoteAddr)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look up and execute handler
|
||||||
|
if handler, exists := handlers[topic]; exists {
|
||||||
|
handler(payload, remoteAddr)
|
||||||
|
} else {
|
||||||
|
nd.logger.WithField("topic", topic).Debug("Received unknown UDP message type")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,9 +171,9 @@ func (nd *NodeDiscovery) updateNodeFromHeartbeat(sourceIP string, sourcePort int
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleNodeUpdate processes NODE_UPDATE messages
|
// handleNodeUpdate processes NODE_UPDATE and node/update messages
|
||||||
func (nd *NodeDiscovery) handleNodeUpdate(sourceIP, message string) {
|
func (nd *NodeDiscovery) handleNodeUpdate(sourceIP, message string) {
|
||||||
// Message format: "NODE_UPDATE:hostname:{json}"
|
// Message format: "NODE_UPDATE:hostname:{json}" or "node/update:hostname:{json}"
|
||||||
parts := strings.SplitN(message, ":", 3)
|
parts := strings.SplitN(message, ":", 3)
|
||||||
if len(parts) < 3 {
|
if len(parts) < 3 {
|
||||||
nd.logger.WithField("message", message).Warn("Invalid NODE_UPDATE message format")
|
nd.logger.WithField("message", message).Warn("Invalid NODE_UPDATE message format")
|
||||||
@@ -344,6 +377,55 @@ func (nd *NodeDiscovery) AddCallback(callback NodeUpdateCallback) {
|
|||||||
nd.callbacks = append(nd.callbacks, callback)
|
nd.callbacks = append(nd.callbacks, callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetClusterEventCallback sets the callback for cluster events
|
||||||
|
func (nd *NodeDiscovery) SetClusterEventCallback(callback ClusterEventBroadcaster) {
|
||||||
|
nd.mutex.Lock()
|
||||||
|
defer nd.mutex.Unlock()
|
||||||
|
nd.clusterEventCallback = callback
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleClusterEvent processes cluster/event messages
|
||||||
|
func (nd *NodeDiscovery) handleClusterEvent(payload string, remoteAddr *net.UDPAddr) {
|
||||||
|
nd.logger.WithFields(log.Fields{
|
||||||
|
"payload": payload,
|
||||||
|
"from": remoteAddr.String(),
|
||||||
|
}).Debug("Received cluster/event message")
|
||||||
|
|
||||||
|
// Forward to websocket if callback is set
|
||||||
|
if nd.clusterEventCallback != nil {
|
||||||
|
nd.clusterEventCallback.BroadcastClusterEvent("cluster/event", payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleClusterBroadcast processes cluster/broadcast messages
|
||||||
|
func (nd *NodeDiscovery) handleClusterBroadcast(payload string, remoteAddr *net.UDPAddr) {
|
||||||
|
nd.logger.WithFields(log.Fields{
|
||||||
|
"payload": payload,
|
||||||
|
"from": remoteAddr.String(),
|
||||||
|
}).Debug("Received cluster/broadcast message")
|
||||||
|
|
||||||
|
// Parse the payload JSON to extract nested event and data
|
||||||
|
var payloadData struct {
|
||||||
|
Event string `json:"event"`
|
||||||
|
Data interface{} `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(payload), &payloadData); err != nil {
|
||||||
|
nd.logger.WithError(err).Error("Failed to parse cluster/broadcast payload")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nd.logger.WithFields(log.Fields{
|
||||||
|
"event": payloadData.Event,
|
||||||
|
"from": remoteAddr.String(),
|
||||||
|
}).Debug("Parsed cluster/broadcast payload")
|
||||||
|
|
||||||
|
// Forward to websocket if callback is set, mapping event to topic and data to data
|
||||||
|
if nd.clusterEventCallback != nil {
|
||||||
|
nd.clusterEventCallback.BroadcastClusterEvent(payloadData.Event, payloadData.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetClusterStatus returns current cluster status
|
// GetClusterStatus returns current cluster status
|
||||||
func (nd *NodeDiscovery) GetClusterStatus() ClusterStatus {
|
func (nd *NodeDiscovery) GetClusterStatus() ClusterStatus {
|
||||||
nd.mutex.RLock()
|
nd.mutex.RLock()
|
||||||
|
|||||||
@@ -41,15 +41,21 @@ type ClusterStatus struct {
|
|||||||
// NodeUpdateCallback is called when node information changes
|
// NodeUpdateCallback is called when node information changes
|
||||||
type NodeUpdateCallback func(nodeIP string, action string)
|
type NodeUpdateCallback func(nodeIP string, action string)
|
||||||
|
|
||||||
|
// ClusterEventBroadcaster interface for broadcasting cluster events
|
||||||
|
type ClusterEventBroadcaster interface {
|
||||||
|
BroadcastClusterEvent(topic string, data interface{})
|
||||||
|
}
|
||||||
|
|
||||||
// NodeDiscovery manages UDP-based node discovery
|
// NodeDiscovery manages UDP-based node discovery
|
||||||
type NodeDiscovery struct {
|
type NodeDiscovery struct {
|
||||||
udpPort string
|
udpPort string
|
||||||
discoveredNodes map[string]*NodeInfo
|
discoveredNodes map[string]*NodeInfo
|
||||||
primaryNode string
|
primaryNode string
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
callbacks []NodeUpdateCallback
|
callbacks []NodeUpdateCallback
|
||||||
staleThreshold time.Duration
|
clusterEventCallback ClusterEventBroadcaster
|
||||||
logger *log.Logger
|
staleThreshold time.Duration
|
||||||
|
logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNodeDiscovery creates a new node discovery instance
|
// NewNodeDiscovery creates a new node discovery instance
|
||||||
@@ -57,7 +63,7 @@ func NewNodeDiscovery(udpPort string) *NodeDiscovery {
|
|||||||
return &NodeDiscovery{
|
return &NodeDiscovery{
|
||||||
udpPort: udpPort,
|
udpPort: udpPort,
|
||||||
discoveredNodes: make(map[string]*NodeInfo),
|
discoveredNodes: make(map[string]*NodeInfo),
|
||||||
staleThreshold: 10 * time.Second, // TODO make configurable
|
staleThreshold: 10 * time.Second, // Heartbeat timeout - mark nodes inactive after 10 seconds
|
||||||
logger: log.New(),
|
logger: log.New(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,6 +38,9 @@ func NewHTTPServer(port string, nodeDiscovery *discovery.NodeDiscovery) *HTTPSer
|
|||||||
// Initialize registry client
|
// Initialize registry client
|
||||||
registryClient := registry.NewRegistryClient("http://localhost:3002")
|
registryClient := registry.NewRegistryClient("http://localhost:3002")
|
||||||
|
|
||||||
|
// Register WebSocket server as cluster event broadcaster
|
||||||
|
nodeDiscovery.SetClusterEventCallback(wsServer)
|
||||||
|
|
||||||
hs := &HTTPServer{
|
hs := &HTTPServer{
|
||||||
port: port,
|
port: port,
|
||||||
router: mux.NewRouter(),
|
router: mux.NewRouter(),
|
||||||
@@ -366,16 +369,19 @@ func (hs *HTTPServer) setPrimaryNode(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// GET /api/cluster/members
|
// GET /api/cluster/members
|
||||||
func (hs *HTTPServer) getClusterMembers(w http.ResponseWriter, r *http.Request) {
|
func (hs *HTTPServer) getClusterMembers(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Debug("Fetching cluster members via API")
|
||||||
|
|
||||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||||
return client.GetClusterStatus()
|
return client.GetClusterStatus()
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error fetching cluster members")
|
log.WithError(err).Debug("Failed to fetch cluster members")
|
||||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("Successfully fetched cluster members via API")
|
||||||
json.NewEncoder(w).Encode(result)
|
json.NewEncoder(w).Encode(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -417,42 +423,52 @@ func (hs *HTTPServer) getTaskStatus(w http.ResponseWriter, r *http.Request) {
|
|||||||
ip := r.URL.Query().Get("ip")
|
ip := r.URL.Query().Get("ip")
|
||||||
|
|
||||||
if ip != "" {
|
if ip != "" {
|
||||||
|
log.WithField("node_ip", ip).Debug("Fetching task status from specific node")
|
||||||
client := hs.getSporeClient(ip)
|
client := hs.getSporeClient(ip)
|
||||||
result, err := client.GetTaskStatus()
|
result, err := client.GetTaskStatus()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error fetching task status from specific node")
|
log.WithFields(log.Fields{
|
||||||
|
"node_ip": ip,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to fetch task status from specific node")
|
||||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.WithField("node_ip", ip).Debug("Successfully fetched task status from specific node")
|
||||||
json.NewEncoder(w).Encode(result)
|
json.NewEncoder(w).Encode(result)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("Fetching task status via failover")
|
||||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||||
return client.GetTaskStatus()
|
return client.GetTaskStatus()
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error fetching task status")
|
log.WithError(err).Debug("Failed to fetch task status via failover")
|
||||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("Successfully fetched task status via failover")
|
||||||
json.NewEncoder(w).Encode(result)
|
json.NewEncoder(w).Encode(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GET /api/node/status
|
// GET /api/node/status
|
||||||
func (hs *HTTPServer) getNodeStatus(w http.ResponseWriter, r *http.Request) {
|
func (hs *HTTPServer) getNodeStatus(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Debug("Fetching node system status via failover")
|
||||||
|
|
||||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||||
return client.GetSystemStatus()
|
return client.GetSystemStatus()
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error fetching system status")
|
log.WithError(err).Debug("Failed to fetch system status via failover")
|
||||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch system status", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch system status", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("Successfully fetched system status via failover")
|
||||||
json.NewEncoder(w).Encode(result)
|
json.NewEncoder(w).Encode(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -461,14 +477,20 @@ func (hs *HTTPServer) getNodeStatusByIP(w http.ResponseWriter, r *http.Request)
|
|||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
nodeIP := vars["ip"]
|
nodeIP := vars["ip"]
|
||||||
|
|
||||||
|
log.WithField("node_ip", nodeIP).Debug("Fetching system status from specific node")
|
||||||
|
|
||||||
client := hs.getSporeClient(nodeIP)
|
client := hs.getSporeClient(nodeIP)
|
||||||
result, err := client.GetSystemStatus()
|
result, err := client.GetSystemStatus()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error fetching status from specific node")
|
log.WithFields(log.Fields{
|
||||||
|
"node_ip": nodeIP,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to fetch status from specific node")
|
||||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch status from node %s", "message": "%s"}`, nodeIP, err.Error()), http.StatusInternalServerError)
|
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch status from node %s", "message": "%s"}`, nodeIP, err.Error()), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithField("node_ip", nodeIP).Debug("Successfully fetched status from specific node")
|
||||||
json.NewEncoder(w).Encode(result)
|
json.NewEncoder(w).Encode(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -477,27 +499,34 @@ func (hs *HTTPServer) getNodeEndpoints(w http.ResponseWriter, r *http.Request) {
|
|||||||
ip := r.URL.Query().Get("ip")
|
ip := r.URL.Query().Get("ip")
|
||||||
|
|
||||||
if ip != "" {
|
if ip != "" {
|
||||||
|
log.WithField("node_ip", ip).Debug("Fetching endpoints from specific node")
|
||||||
client := hs.getSporeClient(ip)
|
client := hs.getSporeClient(ip)
|
||||||
result, err := client.GetCapabilities()
|
result, err := client.GetCapabilities()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error fetching endpoints from specific node")
|
log.WithFields(log.Fields{
|
||||||
|
"node_ip": ip,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to fetch endpoints from specific node")
|
||||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch endpoints from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch endpoints from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.WithField("node_ip", ip).Debug("Successfully fetched endpoints from specific node")
|
||||||
json.NewEncoder(w).Encode(result)
|
json.NewEncoder(w).Encode(result)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("Fetching capabilities via failover")
|
||||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||||
return client.GetCapabilities()
|
return client.GetCapabilities()
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error fetching capabilities")
|
log.WithError(err).Debug("Failed to fetch capabilities via failover")
|
||||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch capabilities", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch capabilities", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("Successfully fetched capabilities via failover")
|
||||||
json.NewEncoder(w).Encode(result)
|
json.NewEncoder(w).Encode(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -849,18 +878,21 @@ type ClusterNodeVersionsResponse struct {
|
|||||||
|
|
||||||
// GET /api/cluster/node/versions
|
// GET /api/cluster/node/versions
|
||||||
func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Request) {
|
func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Debug("Fetching cluster node versions")
|
||||||
|
|
||||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||||
return client.GetClusterStatus()
|
return client.GetClusterStatus()
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error fetching cluster members for versions")
|
log.WithError(err).Debug("Failed to fetch cluster members for versions")
|
||||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
clusterStatus, ok := result.(*client.ClusterStatusResponse)
|
clusterStatus, ok := result.(*client.ClusterStatusResponse)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
log.Debug("Invalid cluster status response type")
|
||||||
http.Error(w, `{"error": "Invalid cluster status response"}`, http.StatusInternalServerError)
|
http.Error(w, `{"error": "Invalid cluster status response"}`, http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -880,6 +912,8 @@ func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Requ
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithField("node_count", len(nodeVersions)).Debug("Successfully fetched cluster node versions")
|
||||||
|
|
||||||
response := ClusterNodeVersionsResponse{
|
response := ClusterNodeVersionsResponse{
|
||||||
Nodes: nodeVersions,
|
Nodes: nodeVersions,
|
||||||
}
|
}
|
||||||
@@ -956,12 +990,25 @@ func (hs *HTTPServer) nodeMatchesLabels(nodeLabels, rolloutLabels map[string]str
|
|||||||
|
|
||||||
// processRollout handles the actual rollout process in the background
|
// processRollout handles the actual rollout process in the background
|
||||||
func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwareInfo FirmwareInfo) {
|
func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwareInfo FirmwareInfo) {
|
||||||
log.WithField("rollout_id", rolloutID).Info("Starting background rollout process")
|
log.WithFields(log.Fields{
|
||||||
|
"rollout_id": rolloutID,
|
||||||
|
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||||
|
"node_count": len(nodes),
|
||||||
|
}).Debug("Starting background rollout process")
|
||||||
|
|
||||||
// Download firmware from registry
|
// Download firmware from registry
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"rollout_id": rolloutID,
|
||||||
|
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||||
|
}).Debug("Downloading firmware from registry for rollout")
|
||||||
|
|
||||||
firmwareData, err := hs.registryClient.DownloadFirmware(firmwareInfo.Name, firmwareInfo.Version)
|
firmwareData, err := hs.registryClient.DownloadFirmware(firmwareInfo.Name, firmwareInfo.Version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Failed to download firmware for rollout")
|
log.WithFields(log.Fields{
|
||||||
|
"rollout_id": rolloutID,
|
||||||
|
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Error("Failed to download firmware for rollout")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -970,7 +1017,7 @@ func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwar
|
|||||||
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||||
"size": len(firmwareData),
|
"size": len(firmwareData),
|
||||||
"total_nodes": len(nodes),
|
"total_nodes": len(nodes),
|
||||||
}).Info("Downloaded firmware for rollout")
|
}).Debug("Successfully downloaded firmware for rollout")
|
||||||
|
|
||||||
// Process nodes in parallel using goroutines
|
// Process nodes in parallel using goroutines
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@@ -984,9 +1031,14 @@ func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwar
|
|||||||
"rollout_id": rolloutID,
|
"rollout_id": rolloutID,
|
||||||
"node_ip": node.IP,
|
"node_ip": node.IP,
|
||||||
"progress": fmt.Sprintf("%d/%d", nodeIndex+1, len(nodes)),
|
"progress": fmt.Sprintf("%d/%d", nodeIndex+1, len(nodes)),
|
||||||
}).Info("Processing node in rollout")
|
}).Debug("Processing node in rollout")
|
||||||
|
|
||||||
// Update version label on the node before upload
|
// Update version label on the node before upload
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"rollout_id": rolloutID,
|
||||||
|
"node_ip": node.IP,
|
||||||
|
}).Debug("Getting SPORE client for node")
|
||||||
|
|
||||||
client := hs.getSporeClient(node.IP)
|
client := hs.getSporeClient(node.IP)
|
||||||
|
|
||||||
// Create updated labels with the new version
|
// Create updated labels with the new version
|
||||||
|
|||||||
@@ -23,26 +23,34 @@ var upgrader = websocket.Upgrader{
|
|||||||
|
|
||||||
// WebSocketServer manages WebSocket connections and broadcasts
|
// WebSocketServer manages WebSocket connections and broadcasts
|
||||||
type WebSocketServer struct {
|
type WebSocketServer struct {
|
||||||
nodeDiscovery *discovery.NodeDiscovery
|
nodeDiscovery *discovery.NodeDiscovery
|
||||||
sporeClients map[string]*client.SporeClient
|
sporeClients map[string]*client.SporeClient
|
||||||
clients map[*websocket.Conn]bool
|
clients map[*websocket.Conn]bool
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections
|
writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
clusterInfoTicker *time.Ticker
|
||||||
|
clusterInfoStopCh chan bool
|
||||||
|
clusterInfoInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWebSocketServer creates a new WebSocket server
|
// NewWebSocketServer creates a new WebSocket server
|
||||||
func NewWebSocketServer(nodeDiscovery *discovery.NodeDiscovery) *WebSocketServer {
|
func NewWebSocketServer(nodeDiscovery *discovery.NodeDiscovery) *WebSocketServer {
|
||||||
wss := &WebSocketServer{
|
wss := &WebSocketServer{
|
||||||
nodeDiscovery: nodeDiscovery,
|
nodeDiscovery: nodeDiscovery,
|
||||||
sporeClients: make(map[string]*client.SporeClient),
|
sporeClients: make(map[string]*client.SporeClient),
|
||||||
clients: make(map[*websocket.Conn]bool),
|
clients: make(map[*websocket.Conn]bool),
|
||||||
logger: log.New(),
|
logger: log.New(),
|
||||||
|
clusterInfoStopCh: make(chan bool),
|
||||||
|
clusterInfoInterval: 5 * time.Second, // Fetch cluster info every 5 seconds
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register callback for node updates
|
// Register callback for node updates
|
||||||
nodeDiscovery.AddCallback(wss.handleNodeUpdate)
|
nodeDiscovery.AddCallback(wss.handleNodeUpdate)
|
||||||
|
|
||||||
|
// Start periodic cluster info fetching
|
||||||
|
go wss.startPeriodicClusterInfoFetching()
|
||||||
|
|
||||||
return wss
|
return wss
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,13 +134,13 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message := struct {
|
message := struct {
|
||||||
Type string `json:"type"`
|
Topic string `json:"topic"`
|
||||||
Members []client.ClusterMember `json:"members"`
|
Members []client.ClusterMember `json:"members"`
|
||||||
PrimaryNode string `json:"primaryNode"`
|
PrimaryNode string `json:"primaryNode"`
|
||||||
TotalNodes int `json:"totalNodes"`
|
TotalNodes int `json:"totalNodes"`
|
||||||
Timestamp string `json:"timestamp"`
|
Timestamp string `json:"timestamp"`
|
||||||
}{
|
}{
|
||||||
Type: "cluster_update",
|
Topic: "cluster/update",
|
||||||
Members: clusterData,
|
Members: clusterData,
|
||||||
PrimaryNode: wss.nodeDiscovery.GetPrimaryNode(),
|
PrimaryNode: wss.nodeDiscovery.GetPrimaryNode(),
|
||||||
TotalNodes: len(nodes),
|
TotalNodes: len(nodes),
|
||||||
@@ -151,17 +159,48 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startPeriodicClusterInfoFetching starts a goroutine that periodically fetches cluster info
|
||||||
|
func (wss *WebSocketServer) startPeriodicClusterInfoFetching() {
|
||||||
|
wss.clusterInfoTicker = time.NewTicker(wss.clusterInfoInterval)
|
||||||
|
defer wss.clusterInfoTicker.Stop()
|
||||||
|
|
||||||
|
wss.logger.WithField("interval", wss.clusterInfoInterval).Info("Starting periodic cluster info fetching")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-wss.clusterInfoTicker.C:
|
||||||
|
wss.fetchAndBroadcastClusterInfo()
|
||||||
|
case <-wss.clusterInfoStopCh:
|
||||||
|
wss.logger.Info("Stopping periodic cluster info fetching")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchAndBroadcastClusterInfo fetches cluster info and broadcasts it to clients
|
||||||
|
func (wss *WebSocketServer) fetchAndBroadcastClusterInfo() {
|
||||||
|
// Only fetch if we have clients connected
|
||||||
|
wss.mutex.RLock()
|
||||||
|
clientCount := len(wss.clients)
|
||||||
|
wss.mutex.RUnlock()
|
||||||
|
|
||||||
|
if clientCount == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
wss.logger.Debug("Periodically fetching cluster info")
|
||||||
|
wss.broadcastClusterUpdate()
|
||||||
|
}
|
||||||
|
|
||||||
// handleNodeUpdate is called when node information changes
|
// handleNodeUpdate is called when node information changes
|
||||||
func (wss *WebSocketServer) handleNodeUpdate(nodeIP, action string) {
|
func (wss *WebSocketServer) handleNodeUpdate(nodeIP, action string) {
|
||||||
wss.logger.WithFields(log.Fields{
|
wss.logger.WithFields(log.Fields{
|
||||||
"node_ip": nodeIP,
|
"node_ip": nodeIP,
|
||||||
"action": action,
|
"action": action,
|
||||||
}).Debug("Node update received, broadcasting to WebSocket clients")
|
}).Debug("Node update received, broadcasting node discovery event")
|
||||||
|
|
||||||
// Broadcast cluster update to all clients
|
// Only broadcast node discovery event, not cluster update
|
||||||
wss.broadcastClusterUpdate()
|
// Cluster updates are now handled by periodic fetching
|
||||||
|
|
||||||
// Also broadcast node discovery event
|
|
||||||
wss.broadcastNodeDiscovery(nodeIP, action)
|
wss.broadcastNodeDiscovery(nodeIP, action)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -188,13 +227,13 @@ func (wss *WebSocketServer) broadcastClusterUpdate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message := struct {
|
message := struct {
|
||||||
Type string `json:"type"`
|
Topic string `json:"topic"`
|
||||||
Members []client.ClusterMember `json:"members"`
|
Members []client.ClusterMember `json:"members"`
|
||||||
PrimaryNode string `json:"primaryNode"`
|
PrimaryNode string `json:"primaryNode"`
|
||||||
TotalNodes int `json:"totalNodes"`
|
TotalNodes int `json:"totalNodes"`
|
||||||
Timestamp string `json:"timestamp"`
|
Timestamp string `json:"timestamp"`
|
||||||
}{
|
}{
|
||||||
Type: "cluster_update",
|
Topic: "cluster/update",
|
||||||
Members: clusterData,
|
Members: clusterData,
|
||||||
PrimaryNode: wss.nodeDiscovery.GetPrimaryNode(),
|
PrimaryNode: wss.nodeDiscovery.GetPrimaryNode(),
|
||||||
TotalNodes: len(wss.nodeDiscovery.GetNodes()),
|
TotalNodes: len(wss.nodeDiscovery.GetNodes()),
|
||||||
@@ -248,12 +287,12 @@ func (wss *WebSocketServer) broadcastNodeDiscovery(nodeIP, action string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message := struct {
|
message := struct {
|
||||||
Type string `json:"type"`
|
Topic string `json:"topic"`
|
||||||
Action string `json:"action"`
|
Action string `json:"action"`
|
||||||
NodeIP string `json:"nodeIp"`
|
NodeIP string `json:"nodeIp"`
|
||||||
Timestamp string `json:"timestamp"`
|
Timestamp string `json:"timestamp"`
|
||||||
}{
|
}{
|
||||||
Type: "node_discovery",
|
Topic: "node/discovery",
|
||||||
Action: action,
|
Action: action,
|
||||||
NodeIP: nodeIP,
|
NodeIP: nodeIP,
|
||||||
Timestamp: time.Now().Format(time.RFC3339),
|
Timestamp: time.Now().Format(time.RFC3339),
|
||||||
@@ -291,14 +330,14 @@ func (wss *WebSocketServer) BroadcastFirmwareUploadStatus(nodeIP, status, filena
|
|||||||
}
|
}
|
||||||
|
|
||||||
message := struct {
|
message := struct {
|
||||||
Type string `json:"type"`
|
Topic string `json:"topic"`
|
||||||
NodeIP string `json:"nodeIp"`
|
NodeIP string `json:"nodeIp"`
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
Filename string `json:"filename"`
|
Filename string `json:"filename"`
|
||||||
FileSize int `json:"fileSize"`
|
FileSize int `json:"fileSize"`
|
||||||
Timestamp string `json:"timestamp"`
|
Timestamp string `json:"timestamp"`
|
||||||
}{
|
}{
|
||||||
Type: "firmware_upload_status",
|
Topic: "firmware/upload/status",
|
||||||
NodeIP: nodeIP,
|
NodeIP: nodeIP,
|
||||||
Status: status,
|
Status: status,
|
||||||
Filename: filename,
|
Filename: filename,
|
||||||
@@ -346,7 +385,7 @@ func (wss *WebSocketServer) BroadcastRolloutProgress(rolloutID, nodeIP, status s
|
|||||||
}
|
}
|
||||||
|
|
||||||
message := struct {
|
message := struct {
|
||||||
Type string `json:"type"`
|
Topic string `json:"topic"`
|
||||||
RolloutID string `json:"rolloutId"`
|
RolloutID string `json:"rolloutId"`
|
||||||
NodeIP string `json:"nodeIp"`
|
NodeIP string `json:"nodeIp"`
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
@@ -355,7 +394,7 @@ func (wss *WebSocketServer) BroadcastRolloutProgress(rolloutID, nodeIP, status s
|
|||||||
Progress int `json:"progress"`
|
Progress int `json:"progress"`
|
||||||
Timestamp string `json:"timestamp"`
|
Timestamp string `json:"timestamp"`
|
||||||
}{
|
}{
|
||||||
Type: "rollout_progress",
|
Topic: "rollout/progress",
|
||||||
RolloutID: rolloutID,
|
RolloutID: rolloutID,
|
||||||
NodeIP: nodeIP,
|
NodeIP: nodeIP,
|
||||||
Status: status,
|
Status: status,
|
||||||
@@ -429,20 +468,38 @@ func (wss *WebSocketServer) calculateProgress(current, total int, status string)
|
|||||||
func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember, error) {
|
func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember, error) {
|
||||||
nodes := wss.nodeDiscovery.GetNodes()
|
nodes := wss.nodeDiscovery.GetNodes()
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
|
wss.logger.Debug("No nodes available for cluster member retrieval")
|
||||||
return []client.ClusterMember{}, nil
|
return []client.ClusterMember{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to get real cluster data from primary node
|
// Try to get real cluster data from primary node
|
||||||
primaryNode := wss.nodeDiscovery.GetPrimaryNode()
|
primaryNode := wss.nodeDiscovery.GetPrimaryNode()
|
||||||
if primaryNode != "" {
|
if primaryNode != "" {
|
||||||
|
wss.logger.WithFields(log.Fields{
|
||||||
|
"primary_node": primaryNode,
|
||||||
|
"total_nodes": len(nodes),
|
||||||
|
}).Debug("Fetching cluster members from primary node")
|
||||||
|
|
||||||
client := wss.getSporeClient(primaryNode)
|
client := wss.getSporeClient(primaryNode)
|
||||||
clusterStatus, err := client.GetClusterStatus()
|
clusterStatus, err := client.GetClusterStatus()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// Update local node data with API information
|
wss.logger.WithFields(log.Fields{
|
||||||
|
"primary_node": primaryNode,
|
||||||
|
"member_count": len(clusterStatus.Members),
|
||||||
|
}).Debug("Successfully fetched cluster members from primary node")
|
||||||
|
|
||||||
|
// Update local node data with API information but preserve heartbeat status
|
||||||
wss.updateLocalNodesWithAPI(clusterStatus.Members)
|
wss.updateLocalNodesWithAPI(clusterStatus.Members)
|
||||||
return clusterStatus.Members, nil
|
|
||||||
|
// Return merged data with heartbeat-based status override
|
||||||
|
return wss.mergeAPIWithHeartbeatStatus(clusterStatus.Members), nil
|
||||||
}
|
}
|
||||||
wss.logger.WithError(err).Error("Failed to get cluster status from primary node")
|
wss.logger.WithFields(log.Fields{
|
||||||
|
"primary_node": primaryNode,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to get cluster status from primary node, using fallback")
|
||||||
|
} else {
|
||||||
|
wss.logger.Debug("No primary node available, using fallback cluster members")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fallback to local data if API fails
|
// Fallback to local data if API fails
|
||||||
@@ -451,20 +508,62 @@ func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember,
|
|||||||
|
|
||||||
// updateLocalNodesWithAPI updates local node data with information from API
|
// updateLocalNodesWithAPI updates local node data with information from API
|
||||||
func (wss *WebSocketServer) updateLocalNodesWithAPI(apiMembers []client.ClusterMember) {
|
func (wss *WebSocketServer) updateLocalNodesWithAPI(apiMembers []client.ClusterMember) {
|
||||||
// This would update the local node discovery with fresh API data
|
|
||||||
// For now, we'll just log that we received the data
|
|
||||||
wss.logger.WithField("members", len(apiMembers)).Debug("Updating local nodes with API data")
|
wss.logger.WithField("members", len(apiMembers)).Debug("Updating local nodes with API data")
|
||||||
|
|
||||||
for _, member := range apiMembers {
|
for _, member := range apiMembers {
|
||||||
if len(member.Labels) > 0 {
|
// Update local node with API data, but preserve heartbeat-based status
|
||||||
wss.logger.WithFields(log.Fields{
|
wss.updateNodeWithAPIData(member)
|
||||||
"ip": member.IP,
|
|
||||||
"labels": member.Labels,
|
|
||||||
}).Debug("API member labels")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateNodeWithAPIData updates a single node with API data while preserving heartbeat status
|
||||||
|
func (wss *WebSocketServer) updateNodeWithAPIData(apiMember client.ClusterMember) {
|
||||||
|
nodes := wss.nodeDiscovery.GetNodes()
|
||||||
|
if localNode, exists := nodes[apiMember.IP]; exists {
|
||||||
|
// Update additional data from API but preserve heartbeat-based status
|
||||||
|
localNode.Labels = apiMember.Labels
|
||||||
|
localNode.Resources = apiMember.Resources
|
||||||
|
localNode.Latency = apiMember.Latency
|
||||||
|
|
||||||
|
// Only update hostname if it's different and not empty
|
||||||
|
if apiMember.Hostname != "" && apiMember.Hostname != localNode.Hostname {
|
||||||
|
localNode.Hostname = apiMember.Hostname
|
||||||
|
}
|
||||||
|
|
||||||
|
wss.logger.WithFields(log.Fields{
|
||||||
|
"ip": apiMember.IP,
|
||||||
|
"labels": apiMember.Labels,
|
||||||
|
"status": localNode.Status, // Keep heartbeat-based status
|
||||||
|
}).Debug("Updated node with API data, preserved heartbeat status")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeAPIWithHeartbeatStatus merges API member data with heartbeat-based status
|
||||||
|
func (wss *WebSocketServer) mergeAPIWithHeartbeatStatus(apiMembers []client.ClusterMember) []client.ClusterMember {
|
||||||
|
localNodes := wss.nodeDiscovery.GetNodes()
|
||||||
|
mergedMembers := make([]client.ClusterMember, 0, len(apiMembers))
|
||||||
|
|
||||||
|
for _, apiMember := range apiMembers {
|
||||||
|
mergedMember := apiMember
|
||||||
|
|
||||||
|
// Override status with heartbeat-based status if we have local data
|
||||||
|
if localNode, exists := localNodes[apiMember.IP]; exists {
|
||||||
|
mergedMember.Status = string(localNode.Status)
|
||||||
|
mergedMember.LastSeen = localNode.LastSeen.Unix()
|
||||||
|
|
||||||
|
wss.logger.WithFields(log.Fields{
|
||||||
|
"ip": apiMember.IP,
|
||||||
|
"api_status": apiMember.Status,
|
||||||
|
"heartbeat_status": localNode.Status,
|
||||||
|
}).Debug("Overriding API status with heartbeat status")
|
||||||
|
}
|
||||||
|
|
||||||
|
mergedMembers = append(mergedMembers, mergedMember)
|
||||||
|
}
|
||||||
|
|
||||||
|
return mergedMembers
|
||||||
|
}
|
||||||
|
|
||||||
// getFallbackClusterMembers returns local node data as fallback
|
// getFallbackClusterMembers returns local node data as fallback
|
||||||
func (wss *WebSocketServer) getFallbackClusterMembers() []client.ClusterMember {
|
func (wss *WebSocketServer) getFallbackClusterMembers() []client.ClusterMember {
|
||||||
nodes := wss.nodeDiscovery.GetNodes()
|
nodes := wss.nodeDiscovery.GetNodes()
|
||||||
@@ -503,10 +602,59 @@ func (wss *WebSocketServer) GetClientCount() int {
|
|||||||
return len(wss.clients)
|
return len(wss.clients)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BroadcastClusterEvent sends cluster events to all connected clients
|
||||||
|
func (wss *WebSocketServer) BroadcastClusterEvent(topic string, data interface{}) {
|
||||||
|
wss.mutex.RLock()
|
||||||
|
clients := make([]*websocket.Conn, 0, len(wss.clients))
|
||||||
|
for client := range wss.clients {
|
||||||
|
clients = append(clients, client)
|
||||||
|
}
|
||||||
|
wss.mutex.RUnlock()
|
||||||
|
|
||||||
|
if len(clients) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
message := struct {
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
Data interface{} `json:"data"`
|
||||||
|
Timestamp string `json:"timestamp"`
|
||||||
|
}{
|
||||||
|
Topic: topic,
|
||||||
|
Data: data,
|
||||||
|
Timestamp: time.Now().Format(time.RFC3339),
|
||||||
|
}
|
||||||
|
|
||||||
|
messageData, err := json.Marshal(message)
|
||||||
|
if err != nil {
|
||||||
|
wss.logger.WithError(err).Error("Failed to marshal cluster event")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
wss.logger.WithFields(log.Fields{
|
||||||
|
"topic": topic,
|
||||||
|
"clients": len(clients),
|
||||||
|
}).Debug("Broadcasting cluster event to WebSocket clients")
|
||||||
|
|
||||||
|
// Send to all clients with write synchronization
|
||||||
|
wss.writeMutex.Lock()
|
||||||
|
defer wss.writeMutex.Unlock()
|
||||||
|
|
||||||
|
for _, client := range clients {
|
||||||
|
client.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||||
|
if err := client.WriteMessage(websocket.TextMessage, messageData); err != nil {
|
||||||
|
wss.logger.WithError(err).Error("Failed to send cluster event to client")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Shutdown gracefully shuts down the WebSocket server
|
// Shutdown gracefully shuts down the WebSocket server
|
||||||
func (wss *WebSocketServer) Shutdown(ctx context.Context) error {
|
func (wss *WebSocketServer) Shutdown(ctx context.Context) error {
|
||||||
wss.logger.Info("Shutting down WebSocket server")
|
wss.logger.Info("Shutting down WebSocket server")
|
||||||
|
|
||||||
|
// Stop periodic cluster info fetching
|
||||||
|
close(wss.clusterInfoStopCh)
|
||||||
|
|
||||||
wss.mutex.Lock()
|
wss.mutex.Lock()
|
||||||
clients := make([]*websocket.Conn, 0, len(wss.clients))
|
clients := make([]*websocket.Conn, 0, len(wss.clients))
|
||||||
for client := range wss.clients {
|
for client := range wss.clients {
|
||||||
|
|||||||
@@ -117,21 +117,43 @@ type FirmwareUpdateResponse struct {
|
|||||||
func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) {
|
func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) {
|
||||||
url := fmt.Sprintf("%s/api/cluster/members", c.BaseURL)
|
url := fmt.Sprintf("%s/api/cluster/members", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"endpoint": "/api/cluster/members",
|
||||||
|
}).Debug("Fetching cluster status from SPORE node")
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Get(url)
|
resp, err := c.HTTPClient.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to fetch cluster status from SPORE node")
|
||||||
return nil, fmt.Errorf("failed to get cluster status: %w", err)
|
return nil, fmt.Errorf("failed to get cluster status: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
}).Debug("Cluster status request returned non-OK status")
|
||||||
return nil, fmt.Errorf("cluster status request failed with status %d", resp.StatusCode)
|
return nil, fmt.Errorf("cluster status request failed with status %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
var clusterStatus ClusterStatusResponse
|
var clusterStatus ClusterStatusResponse
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&clusterStatus); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&clusterStatus); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode cluster status response")
|
||||||
return nil, fmt.Errorf("failed to decode cluster status response: %w", err)
|
return nil, fmt.Errorf("failed to decode cluster status response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"member_count": len(clusterStatus.Members),
|
||||||
|
}).Debug("Successfully fetched cluster status from SPORE node")
|
||||||
|
|
||||||
return &clusterStatus, nil
|
return &clusterStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,21 +161,44 @@ func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) {
|
|||||||
func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) {
|
func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) {
|
||||||
url := fmt.Sprintf("%s/api/tasks/status", c.BaseURL)
|
url := fmt.Sprintf("%s/api/tasks/status", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"endpoint": "/api/tasks/status",
|
||||||
|
}).Debug("Fetching task status from SPORE node")
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Get(url)
|
resp, err := c.HTTPClient.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to fetch task status from SPORE node")
|
||||||
return nil, fmt.Errorf("failed to get task status: %w", err)
|
return nil, fmt.Errorf("failed to get task status: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
}).Debug("Task status request returned non-OK status")
|
||||||
return nil, fmt.Errorf("task status request failed with status %d", resp.StatusCode)
|
return nil, fmt.Errorf("task status request failed with status %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
var taskStatus TaskStatusResponse
|
var taskStatus TaskStatusResponse
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&taskStatus); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&taskStatus); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode task status response")
|
||||||
return nil, fmt.Errorf("failed to decode task status response: %w", err)
|
return nil, fmt.Errorf("failed to decode task status response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"total_tasks": taskStatus.Summary.TotalTasks,
|
||||||
|
"active_tasks": taskStatus.Summary.ActiveTasks,
|
||||||
|
}).Debug("Successfully fetched task status from SPORE node")
|
||||||
|
|
||||||
return &taskStatus, nil
|
return &taskStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,21 +206,44 @@ func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) {
|
|||||||
func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) {
|
func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) {
|
||||||
url := fmt.Sprintf("%s/api/node/status", c.BaseURL)
|
url := fmt.Sprintf("%s/api/node/status", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"endpoint": "/api/node/status",
|
||||||
|
}).Debug("Fetching system status from SPORE node")
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Get(url)
|
resp, err := c.HTTPClient.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to fetch system status from SPORE node")
|
||||||
return nil, fmt.Errorf("failed to get system status: %w", err)
|
return nil, fmt.Errorf("failed to get system status: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
}).Debug("System status request returned non-OK status")
|
||||||
return nil, fmt.Errorf("system status request failed with status %d", resp.StatusCode)
|
return nil, fmt.Errorf("system status request failed with status %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
var systemStatus SystemStatusResponse
|
var systemStatus SystemStatusResponse
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&systemStatus); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&systemStatus); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode system status response")
|
||||||
return nil, fmt.Errorf("failed to decode system status response: %w", err)
|
return nil, fmt.Errorf("failed to decode system status response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"free_heap": systemStatus.FreeHeap,
|
||||||
|
"chip_id": systemStatus.ChipID,
|
||||||
|
}).Debug("Successfully fetched system status from SPORE node")
|
||||||
|
|
||||||
return &systemStatus, nil
|
return &systemStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,21 +251,43 @@ func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) {
|
|||||||
func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) {
|
func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) {
|
||||||
url := fmt.Sprintf("%s/api/node/endpoints", c.BaseURL)
|
url := fmt.Sprintf("%s/api/node/endpoints", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"endpoint": "/api/node/endpoints",
|
||||||
|
}).Debug("Fetching capabilities from SPORE node")
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Get(url)
|
resp, err := c.HTTPClient.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to fetch capabilities from SPORE node")
|
||||||
return nil, fmt.Errorf("failed to get capabilities: %w", err)
|
return nil, fmt.Errorf("failed to get capabilities: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
}).Debug("Capabilities request returned non-OK status")
|
||||||
return nil, fmt.Errorf("capabilities request failed with status %d", resp.StatusCode)
|
return nil, fmt.Errorf("capabilities request failed with status %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
var capabilities CapabilitiesResponse
|
var capabilities CapabilitiesResponse
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&capabilities); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&capabilities); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode capabilities response")
|
||||||
return nil, fmt.Errorf("failed to decode capabilities response: %w", err)
|
return nil, fmt.Errorf("failed to decode capabilities response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"endpoint_count": len(capabilities.Endpoints),
|
||||||
|
}).Debug("Successfully fetched capabilities from SPORE node")
|
||||||
|
|
||||||
return &capabilities, nil
|
return &capabilities, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,16 +295,30 @@ func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) {
|
|||||||
func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*FirmwareUpdateResponse, error) {
|
func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*FirmwareUpdateResponse, error) {
|
||||||
url := fmt.Sprintf("%s/api/node/update", c.BaseURL)
|
url := fmt.Sprintf("%s/api/node/update", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"endpoint": "/api/node/update",
|
||||||
|
"filename": filename,
|
||||||
|
"data_size": len(firmwareData),
|
||||||
|
}).Debug("Preparing firmware upload to SPORE node")
|
||||||
|
|
||||||
// Create multipart form
|
// Create multipart form
|
||||||
var requestBody bytes.Buffer
|
var requestBody bytes.Buffer
|
||||||
contentType := createMultipartForm(&requestBody, firmwareData, filename)
|
contentType := createMultipartForm(&requestBody, firmwareData, filename)
|
||||||
|
|
||||||
if contentType == "" {
|
if contentType == "" {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
}).Debug("Failed to create multipart form for firmware upload")
|
||||||
return nil, fmt.Errorf("failed to create multipart form")
|
return nil, fmt.Errorf("failed to create multipart form")
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", url, &requestBody)
|
req, err := http.NewRequest("POST", url, &requestBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to create firmware update request")
|
||||||
return nil, fmt.Errorf("failed to create firmware update request: %w", err)
|
return nil, fmt.Errorf("failed to create firmware update request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -226,9 +330,10 @@ func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*Fir
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"node_ip": c.BaseURL,
|
"node_url": c.BaseURL,
|
||||||
"status": "sending_firmware",
|
"filename": filename,
|
||||||
}).Debug("Sending firmware to SPORE device")
|
"data_size": len(firmwareData),
|
||||||
|
}).Debug("Uploading firmware to SPORE node")
|
||||||
|
|
||||||
resp, err := firmwareClient.Do(req)
|
resp, err := firmwareClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -277,9 +382,19 @@ func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*Fir
|
|||||||
func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
|
func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
|
||||||
targetURL := fmt.Sprintf("%s/api/node/config", c.BaseURL)
|
targetURL := fmt.Sprintf("%s/api/node/config", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"endpoint": "/api/node/config",
|
||||||
|
"labels": labels,
|
||||||
|
}).Debug("Updating node labels on SPORE node")
|
||||||
|
|
||||||
// Convert labels to JSON
|
// Convert labels to JSON
|
||||||
labelsJSON, err := json.Marshal(labels)
|
labelsJSON, err := json.Marshal(labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to marshal labels")
|
||||||
return fmt.Errorf("failed to marshal labels: %w", err)
|
return fmt.Errorf("failed to marshal labels: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -289,6 +404,10 @@ func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
|
|||||||
|
|
||||||
req, err := http.NewRequest("POST", targetURL, strings.NewReader(data.Encode()))
|
req, err := http.NewRequest("POST", targetURL, strings.NewReader(data.Encode()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to create labels update request")
|
||||||
return fmt.Errorf("failed to create labels update request: %w", err)
|
return fmt.Errorf("failed to create labels update request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -296,19 +415,28 @@ func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
|
|||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
resp, err := c.HTTPClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to update node labels")
|
||||||
return fmt.Errorf("failed to update node labels: %w", err)
|
return fmt.Errorf("failed to update node labels: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
body, _ := io.ReadAll(resp.Body)
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
"error_body": string(body),
|
||||||
|
}).Debug("Node labels update returned non-OK status")
|
||||||
return fmt.Errorf("node labels update failed with status %d: %s", resp.StatusCode, string(body))
|
return fmt.Errorf("node labels update failed with status %d: %s", resp.StatusCode, string(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"node_ip": c.BaseURL,
|
"node_url": c.BaseURL,
|
||||||
"labels": labels,
|
"labels": labels,
|
||||||
}).Info("Node labels updated successfully")
|
}).Debug("Successfully updated node labels on SPORE node")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -318,17 +446,43 @@ func (c *SporeClient) ProxyCall(method, uri string, params map[string]interface{
|
|||||||
// Build target URL
|
// Build target URL
|
||||||
targetURL := fmt.Sprintf("%s%s", c.BaseURL, uri)
|
targetURL := fmt.Sprintf("%s%s", c.BaseURL, uri)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"method": method,
|
||||||
|
"endpoint": uri,
|
||||||
|
"param_count": len(params),
|
||||||
|
}).Debug("Making proxy call to SPORE node")
|
||||||
|
|
||||||
// Parse parameters and build request
|
// Parse parameters and build request
|
||||||
req, err := c.buildProxyRequest(method, targetURL, params)
|
req, err := c.buildProxyRequest(method, targetURL, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"method": method,
|
||||||
|
"endpoint": uri,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to build proxy request")
|
||||||
return nil, fmt.Errorf("failed to build proxy request: %w", err)
|
return nil, fmt.Errorf("failed to build proxy request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
resp, err := c.HTTPClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"method": method,
|
||||||
|
"endpoint": uri,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Proxy call failed")
|
||||||
return nil, fmt.Errorf("proxy call failed: %w", err)
|
return nil, fmt.Errorf("proxy call failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"node_url": c.BaseURL,
|
||||||
|
"method": method,
|
||||||
|
"endpoint": uri,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
}).Debug("Proxy call completed successfully")
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -69,21 +69,42 @@ func (c *RegistryClient) FindFirmwareByNameAndVersion(name, version string) (*Fi
|
|||||||
func (c *RegistryClient) GetHealth() (map[string]interface{}, error) {
|
func (c *RegistryClient) GetHealth() (map[string]interface{}, error) {
|
||||||
url := fmt.Sprintf("%s/health", c.BaseURL)
|
url := fmt.Sprintf("%s/health", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"endpoint": "/health",
|
||||||
|
}).Debug("Checking registry health")
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Get(url)
|
resp, err := c.HTTPClient.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to check registry health")
|
||||||
return nil, fmt.Errorf("failed to get registry health: %w", err)
|
return nil, fmt.Errorf("failed to get registry health: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
}).Debug("Registry health check returned non-OK status")
|
||||||
return nil, fmt.Errorf("registry health check failed with status %d", resp.StatusCode)
|
return nil, fmt.Errorf("registry health check failed with status %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
var health map[string]interface{}
|
var health map[string]interface{}
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode health response")
|
||||||
return nil, fmt.Errorf("failed to decode health response: %w", err)
|
return nil, fmt.Errorf("failed to decode health response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
}).Debug("Successfully checked registry health")
|
||||||
|
|
||||||
return health, nil
|
return health, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,6 +112,13 @@ func (c *RegistryClient) GetHealth() (map[string]interface{}, error) {
|
|||||||
func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile io.Reader) (map[string]interface{}, error) {
|
func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile io.Reader) (map[string]interface{}, error) {
|
||||||
url := fmt.Sprintf("%s/firmware", c.BaseURL)
|
url := fmt.Sprintf("%s/firmware", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"endpoint": "/firmware",
|
||||||
|
"name": metadata.Name,
|
||||||
|
"version": metadata.Version,
|
||||||
|
}).Debug("Uploading firmware to registry")
|
||||||
|
|
||||||
// Create multipart form data
|
// Create multipart form data
|
||||||
body := &bytes.Buffer{}
|
body := &bytes.Buffer{}
|
||||||
writer := multipart.NewWriter(body)
|
writer := multipart.NewWriter(body)
|
||||||
@@ -98,11 +126,19 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
|||||||
// Add metadata
|
// Add metadata
|
||||||
metadataJSON, err := json.Marshal(metadata)
|
metadataJSON, err := json.Marshal(metadata)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to marshal firmware metadata")
|
||||||
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
|
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
metadataPart, err := writer.CreateFormField("metadata")
|
metadataPart, err := writer.CreateFormField("metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to create metadata field")
|
||||||
return nil, fmt.Errorf("failed to create metadata field: %w", err)
|
return nil, fmt.Errorf("failed to create metadata field: %w", err)
|
||||||
}
|
}
|
||||||
metadataPart.Write(metadataJSON)
|
metadataPart.Write(metadataJSON)
|
||||||
@@ -110,10 +146,18 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
|||||||
// Add firmware file
|
// Add firmware file
|
||||||
firmwarePart, err := writer.CreateFormFile("firmware", fmt.Sprintf("%s-%s.bin", metadata.Name, metadata.Version))
|
firmwarePart, err := writer.CreateFormFile("firmware", fmt.Sprintf("%s-%s.bin", metadata.Name, metadata.Version))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to create firmware field")
|
||||||
return nil, fmt.Errorf("failed to create firmware field: %w", err)
|
return nil, fmt.Errorf("failed to create firmware field: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := io.Copy(firmwarePart, firmwareFile); err != nil {
|
if _, err := io.Copy(firmwarePart, firmwareFile); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to copy firmware data")
|
||||||
return nil, fmt.Errorf("failed to copy firmware data: %w", err)
|
return nil, fmt.Errorf("failed to copy firmware data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,6 +165,10 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
|||||||
|
|
||||||
req, err := http.NewRequest("POST", url, body)
|
req, err := http.NewRequest("POST", url, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to create upload request")
|
||||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,20 +176,43 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
|||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
resp, err := c.HTTPClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": metadata.Name,
|
||||||
|
"version": metadata.Version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to upload firmware to registry")
|
||||||
return nil, fmt.Errorf("failed to upload firmware: %w", err)
|
return nil, fmt.Errorf("failed to upload firmware: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||||
body, _ := io.ReadAll(resp.Body)
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": metadata.Name,
|
||||||
|
"version": metadata.Version,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
"error_body": string(body),
|
||||||
|
}).Debug("Firmware upload returned non-OK status")
|
||||||
return nil, fmt.Errorf("firmware upload failed with status %d: %s", resp.StatusCode, string(body))
|
return nil, fmt.Errorf("firmware upload failed with status %d: %s", resp.StatusCode, string(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
var result map[string]interface{}
|
var result map[string]interface{}
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode upload response")
|
||||||
return nil, fmt.Errorf("failed to decode upload response: %w", err)
|
return nil, fmt.Errorf("failed to decode upload response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": metadata.Name,
|
||||||
|
"version": metadata.Version,
|
||||||
|
}).Debug("Successfully uploaded firmware to registry")
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -149,13 +220,32 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
|||||||
func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata FirmwareMetadata) (map[string]interface{}, error) {
|
func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata FirmwareMetadata) (map[string]interface{}, error) {
|
||||||
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
}).Debug("Updating firmware metadata in registry")
|
||||||
|
|
||||||
metadataJSON, err := json.Marshal(metadata)
|
metadataJSON, err := json.Marshal(metadata)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to marshal metadata")
|
||||||
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
|
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(metadataJSON))
|
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(metadataJSON))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to create update request")
|
||||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -163,20 +253,45 @@ func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata F
|
|||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
resp, err := c.HTTPClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to update firmware metadata in registry")
|
||||||
return nil, fmt.Errorf("failed to update firmware metadata: %w", err)
|
return nil, fmt.Errorf("failed to update firmware metadata: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
body, _ := io.ReadAll(resp.Body)
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
"error_body": string(body),
|
||||||
|
}).Debug("Firmware metadata update returned non-OK status")
|
||||||
return nil, fmt.Errorf("firmware metadata update failed with status %d: %s", resp.StatusCode, string(body))
|
return nil, fmt.Errorf("firmware metadata update failed with status %d: %s", resp.StatusCode, string(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
var result map[string]interface{}
|
var result map[string]interface{}
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode update response")
|
||||||
return nil, fmt.Errorf("failed to decode update response: %w", err)
|
return nil, fmt.Errorf("failed to decode update response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
}).Debug("Successfully updated firmware metadata in registry")
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -221,21 +336,43 @@ func (c *RegistryClient) firmwareMatchesLabels(firmwareLabels, rolloutLabels map
|
|||||||
func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) {
|
func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) {
|
||||||
url := fmt.Sprintf("%s/firmware", c.BaseURL)
|
url := fmt.Sprintf("%s/firmware", c.BaseURL)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"endpoint": "/firmware",
|
||||||
|
}).Debug("Fetching firmware list from registry")
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Get(url)
|
resp, err := c.HTTPClient.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to fetch firmware list from registry")
|
||||||
return nil, fmt.Errorf("failed to get firmware list: %w", err)
|
return nil, fmt.Errorf("failed to get firmware list: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
}).Debug("Firmware list request returned non-OK status")
|
||||||
return nil, fmt.Errorf("firmware list request failed with status %d", resp.StatusCode)
|
return nil, fmt.Errorf("firmware list request failed with status %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
var firmwareList []GroupedFirmware
|
var firmwareList []GroupedFirmware
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&firmwareList); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&firmwareList); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode firmware list response")
|
||||||
return nil, fmt.Errorf("failed to decode firmware list response: %w", err)
|
return nil, fmt.Errorf("failed to decode firmware list response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"firmware_count": len(firmwareList),
|
||||||
|
}).Debug("Successfully fetched firmware list from registry")
|
||||||
|
|
||||||
return firmwareList, nil
|
return firmwareList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -243,26 +380,52 @@ func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) {
|
|||||||
func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error) {
|
func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error) {
|
||||||
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
}).Debug("Downloading firmware from registry")
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Get(url)
|
resp, err := c.HTTPClient.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to download firmware from registry")
|
||||||
return nil, fmt.Errorf("failed to download firmware: %w", err)
|
return nil, fmt.Errorf("failed to download firmware: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
}).Debug("Firmware download request returned non-OK status")
|
||||||
return nil, fmt.Errorf("firmware download request failed with status %d", resp.StatusCode)
|
return nil, fmt.Errorf("firmware download request failed with status %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := io.ReadAll(resp.Body)
|
data, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to read firmware data from registry")
|
||||||
return nil, fmt.Errorf("failed to read firmware data: %w", err)
|
return nil, fmt.Errorf("failed to read firmware data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"name": name,
|
"registry_url": c.BaseURL,
|
||||||
"version": version,
|
"name": name,
|
||||||
"size": len(data),
|
"version": version,
|
||||||
}).Info("Downloaded firmware from registry")
|
"size": len(data),
|
||||||
|
}).Debug("Successfully downloaded firmware from registry")
|
||||||
|
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
@@ -271,31 +434,64 @@ func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error)
|
|||||||
func (c *RegistryClient) DeleteFirmware(name, version string) (map[string]interface{}, error) {
|
func (c *RegistryClient) DeleteFirmware(name, version string) (map[string]interface{}, error) {
|
||||||
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
}).Debug("Deleting firmware from registry")
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodDelete, url, nil)
|
req, err := http.NewRequest(http.MethodDelete, url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to create delete request")
|
||||||
return nil, fmt.Errorf("failed to create delete request: %w", err)
|
return nil, fmt.Errorf("failed to create delete request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.HTTPClient.Do(req)
|
resp, err := c.HTTPClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to delete firmware from registry")
|
||||||
return nil, fmt.Errorf("failed to delete firmware: %w", err)
|
return nil, fmt.Errorf("failed to delete firmware: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
body, _ := io.ReadAll(resp.Body)
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"status_code": resp.StatusCode,
|
||||||
|
"error_body": string(body),
|
||||||
|
}).Debug("Firmware delete returned non-OK status")
|
||||||
return nil, fmt.Errorf("firmware delete request failed with status %d: %s", resp.StatusCode, string(body))
|
return nil, fmt.Errorf("firmware delete request failed with status %d: %s", resp.StatusCode, string(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
var result map[string]interface{}
|
var result map[string]interface{}
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"registry_url": c.BaseURL,
|
||||||
|
"name": name,
|
||||||
|
"version": version,
|
||||||
|
"error": err.Error(),
|
||||||
|
}).Debug("Failed to decode delete response")
|
||||||
return nil, fmt.Errorf("failed to decode delete response: %w", err)
|
return nil, fmt.Errorf("failed to decode delete response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"name": name,
|
"registry_url": c.BaseURL,
|
||||||
"version": version,
|
"name": name,
|
||||||
}).Info("Deleted firmware from registry")
|
"version": version,
|
||||||
|
}).Debug("Successfully deleted firmware from registry")
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user