feat: rollout
This commit is contained in:
@@ -7,11 +7,13 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"spore-gateway/internal/discovery"
|
||||
"spore-gateway/internal/websocket"
|
||||
"spore-gateway/pkg/client"
|
||||
"spore-gateway/pkg/registry"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -24,6 +26,7 @@ type HTTPServer struct {
|
||||
nodeDiscovery *discovery.NodeDiscovery
|
||||
sporeClients map[string]*client.SporeClient
|
||||
webSocketServer *websocket.WebSocketServer
|
||||
registryClient *registry.RegistryClient
|
||||
server *http.Server
|
||||
}
|
||||
|
||||
@@ -32,12 +35,16 @@ func NewHTTPServer(port string, nodeDiscovery *discovery.NodeDiscovery) *HTTPSer
|
||||
// Initialize WebSocket server
|
||||
wsServer := websocket.NewWebSocketServer(nodeDiscovery)
|
||||
|
||||
// Initialize registry client
|
||||
registryClient := registry.NewRegistryClient("http://localhost:3002")
|
||||
|
||||
hs := &HTTPServer{
|
||||
port: port,
|
||||
router: mux.NewRouter(),
|
||||
nodeDiscovery: nodeDiscovery,
|
||||
sporeClients: make(map[string]*client.SporeClient),
|
||||
webSocketServer: wsServer,
|
||||
registryClient: registryClient,
|
||||
}
|
||||
|
||||
hs.setupRoutes()
|
||||
@@ -122,6 +129,8 @@ func (hs *HTTPServer) setupRoutes() {
|
||||
// Cluster endpoints
|
||||
api.HandleFunc("/cluster/members", hs.getClusterMembers).Methods("GET")
|
||||
api.HandleFunc("/cluster/refresh", hs.refreshCluster).Methods("POST", "OPTIONS")
|
||||
api.HandleFunc("/cluster/node/versions", hs.getClusterNodeVersions).Methods("GET")
|
||||
api.HandleFunc("/rollout", hs.startRollout).Methods("POST", "OPTIONS")
|
||||
|
||||
// Task endpoints
|
||||
api.HandleFunc("/tasks/status", hs.getTaskStatus).Methods("GET")
|
||||
@@ -135,6 +144,13 @@ func (hs *HTTPServer) setupRoutes() {
|
||||
// Proxy endpoints
|
||||
api.HandleFunc("/proxy-call", hs.proxyCall).Methods("POST", "OPTIONS")
|
||||
|
||||
// Registry proxy endpoints
|
||||
api.HandleFunc("/registry/health", hs.getRegistryHealth).Methods("GET")
|
||||
api.HandleFunc("/registry/firmware", hs.listRegistryFirmware).Methods("GET")
|
||||
api.HandleFunc("/registry/firmware", hs.uploadRegistryFirmware).Methods("POST", "OPTIONS")
|
||||
api.HandleFunc("/registry/firmware/{name}/{version}", hs.downloadRegistryFirmware).Methods("GET")
|
||||
api.HandleFunc("/registry/firmware/{name}/{version}", hs.updateRegistryFirmware).Methods("PUT", "OPTIONS")
|
||||
|
||||
// Test endpoints
|
||||
api.HandleFunc("/test/websocket", hs.testWebSocket).Methods("POST", "OPTIONS")
|
||||
|
||||
@@ -787,3 +803,433 @@ func (hs *HTTPServer) healthCheck(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(statusCode)
|
||||
json.NewEncoder(w).Encode(health)
|
||||
}
|
||||
|
||||
// RolloutRequest represents a rollout request
|
||||
type RolloutRequest struct {
|
||||
Firmware FirmwareInfo `json:"firmware"`
|
||||
Nodes []NodeInfo `json:"nodes"`
|
||||
}
|
||||
|
||||
// FirmwareInfo represents firmware information
|
||||
type FirmwareInfo struct {
|
||||
Name string `json:"name"`
|
||||
Version string `json:"version"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
}
|
||||
|
||||
// NodeInfo represents node information
|
||||
type NodeInfo struct {
|
||||
IP string `json:"ip"`
|
||||
Version string `json:"version"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
}
|
||||
|
||||
// RolloutResponse represents a rollout response
|
||||
type RolloutResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
RolloutID string `json:"rolloutId"`
|
||||
TotalNodes int `json:"totalNodes"`
|
||||
FirmwareURL string `json:"firmwareUrl"`
|
||||
}
|
||||
|
||||
// NodeVersionInfo represents node version information
|
||||
type NodeVersionInfo struct {
|
||||
IP string `json:"ip"`
|
||||
Version string `json:"version"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
}
|
||||
|
||||
// ClusterNodeVersionsResponse represents the response for cluster node versions
|
||||
type ClusterNodeVersionsResponse struct {
|
||||
Nodes []NodeVersionInfo `json:"nodes"`
|
||||
}
|
||||
|
||||
// GET /api/cluster/node/versions
|
||||
func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Request) {
|
||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||
return client.GetClusterStatus()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching cluster members for versions")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
clusterStatus, ok := result.(*client.ClusterStatusResponse)
|
||||
if !ok {
|
||||
http.Error(w, `{"error": "Invalid cluster status response"}`, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Extract version information from cluster members
|
||||
var nodeVersions []NodeVersionInfo
|
||||
for _, member := range clusterStatus.Members {
|
||||
version := "unknown"
|
||||
if v, exists := member.Labels["version"]; exists {
|
||||
version = v
|
||||
}
|
||||
|
||||
nodeVersions = append(nodeVersions, NodeVersionInfo{
|
||||
IP: member.IP,
|
||||
Version: version,
|
||||
Labels: member.Labels,
|
||||
})
|
||||
}
|
||||
|
||||
response := ClusterNodeVersionsResponse{
|
||||
Nodes: nodeVersions,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// POST /api/rollout
|
||||
func (hs *HTTPServer) startRollout(w http.ResponseWriter, r *http.Request) {
|
||||
var request RolloutRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
|
||||
http.Error(w, `{"error": "Invalid JSON", "message": "Failed to parse request body"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if len(request.Nodes) == 0 {
|
||||
http.Error(w, `{"error": "No nodes", "message": "No nodes provided for rollout"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if request.Firmware.Name == "" || request.Firmware.Version == "" {
|
||||
http.Error(w, `{"error": "Missing firmware info", "message": "Firmware name and version are required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"firmware_name": request.Firmware.Name,
|
||||
"firmware_version": request.Firmware.Version,
|
||||
"node_count": len(request.Nodes),
|
||||
}).Info("Starting rollout")
|
||||
|
||||
// Look up firmware in registry by name and version
|
||||
firmware, err := hs.registryClient.FindFirmwareByNameAndVersion(request.Firmware.Name, request.Firmware.Version)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to find firmware in registry")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Firmware not found", "message": "No firmware found with name %s and version %s: %s"}`, request.Firmware.Name, request.Firmware.Version, err.Error()), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
firmwareURL := fmt.Sprintf("http://localhost:3002/firmware/%s/%s", firmware.Name, firmware.Version)
|
||||
rolloutID := fmt.Sprintf("rollout_%d", time.Now().Unix())
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"matching_nodes": len(request.Nodes),
|
||||
"firmware_name": request.Firmware.Name,
|
||||
"firmware_version": request.Firmware.Version,
|
||||
}).Info("Rollout initiated")
|
||||
|
||||
// Send immediate response
|
||||
response := RolloutResponse{
|
||||
Success: true,
|
||||
Message: fmt.Sprintf("Rollout started for %d nodes", len(request.Nodes)),
|
||||
RolloutID: rolloutID,
|
||||
TotalNodes: len(request.Nodes),
|
||||
FirmwareURL: firmwareURL,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
|
||||
// Start rollout process in background
|
||||
go hs.processRollout(rolloutID, request.Nodes, request.Firmware)
|
||||
}
|
||||
|
||||
// nodeMatchesLabels checks if a node's labels match the rollout labels
|
||||
func (hs *HTTPServer) nodeMatchesLabels(nodeLabels, rolloutLabels map[string]string) bool {
|
||||
for key, value := range rolloutLabels {
|
||||
if nodeValue, exists := nodeLabels[key]; !exists || nodeValue != value {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// processRollout handles the actual rollout process in the background
|
||||
func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwareInfo FirmwareInfo) {
|
||||
log.WithField("rollout_id", rolloutID).Info("Starting background rollout process")
|
||||
|
||||
// Download firmware from registry
|
||||
firmwareData, err := hs.registryClient.DownloadFirmware(firmwareInfo.Name, firmwareInfo.Version)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to download firmware for rollout")
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||
"size": len(firmwareData),
|
||||
"total_nodes": len(nodes),
|
||||
}).Info("Downloaded firmware for rollout")
|
||||
|
||||
// Process nodes in parallel using goroutines
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i, node := range nodes {
|
||||
wg.Add(1)
|
||||
go func(nodeIndex int, node NodeInfo) {
|
||||
defer wg.Done()
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
"progress": fmt.Sprintf("%d/%d", nodeIndex+1, len(nodes)),
|
||||
}).Info("Processing node in rollout")
|
||||
|
||||
// Update version label on the node before upload
|
||||
client := hs.getSporeClient(node.IP)
|
||||
|
||||
// Create updated labels with the new version
|
||||
updatedLabels := make(map[string]string)
|
||||
for k, v := range node.Labels {
|
||||
updatedLabels[k] = v
|
||||
}
|
||||
|
||||
// Ensure version label is properly formatted
|
||||
versionToSet := firmwareInfo.Version
|
||||
// Remove 'v' prefix if present to ensure consistency
|
||||
versionToSet = strings.TrimPrefix(versionToSet, "v")
|
||||
updatedLabels["version"] = versionToSet
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
"old_version": node.Labels["version"],
|
||||
"new_version": versionToSet,
|
||||
"original_firmware_version": firmwareInfo.Version,
|
||||
"all_labels": updatedLabels,
|
||||
}).Info("Updating version label on node")
|
||||
|
||||
// Broadcast label update progress
|
||||
hs.webSocketServer.BroadcastRolloutProgress(rolloutID, node.IP, "updating_labels", nodeIndex+1, len(nodes))
|
||||
|
||||
if err := client.UpdateNodeLabels(updatedLabels); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
"error": err.Error(),
|
||||
}).Error("Failed to update version label on node")
|
||||
|
||||
// Broadcast failure
|
||||
hs.webSocketServer.BroadcastRolloutProgress(rolloutID, node.IP, "failed", nodeIndex+1, len(nodes))
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
"version": versionToSet,
|
||||
}).Info("Successfully updated version label on node")
|
||||
|
||||
// Broadcast upload progress
|
||||
hs.webSocketServer.BroadcastRolloutProgress(rolloutID, node.IP, "uploading", nodeIndex+1, len(nodes))
|
||||
|
||||
// Upload firmware to node
|
||||
result, err := client.UpdateFirmware(firmwareData, fmt.Sprintf("%s-%s.bin", firmwareInfo.Name, firmwareInfo.Version))
|
||||
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
"error": err.Error(),
|
||||
}).Error("Failed to upload firmware to node")
|
||||
|
||||
// Broadcast failure
|
||||
hs.webSocketServer.BroadcastRolloutProgress(rolloutID, node.IP, "failed", nodeIndex+1, len(nodes))
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the device reported a failure
|
||||
if result.Status == "FAIL" {
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
"message": result.Message,
|
||||
}).Error("Device reported firmware update failure")
|
||||
|
||||
// Broadcast failure
|
||||
hs.webSocketServer.BroadcastRolloutProgress(rolloutID, node.IP, "failed", nodeIndex+1, len(nodes))
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
"result": result.Status,
|
||||
}).Info("Firmware upload completed successfully")
|
||||
|
||||
// Broadcast completion
|
||||
hs.webSocketServer.BroadcastRolloutProgress(rolloutID, node.IP, "completed", nodeIndex+1, len(nodes))
|
||||
}(i, node)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to complete
|
||||
wg.Wait()
|
||||
|
||||
log.WithField("rollout_id", rolloutID).Info("Rollout process completed")
|
||||
}
|
||||
|
||||
// Registry proxy handlers
|
||||
|
||||
// GET /api/registry/health
|
||||
func (hs *HTTPServer) getRegistryHealth(w http.ResponseWriter, r *http.Request) {
|
||||
health, err := hs.registryClient.GetHealth()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to get registry health")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Registry health check failed", "message": "%s"}`, err.Error()), http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(health)
|
||||
}
|
||||
|
||||
// GET /api/registry/firmware
|
||||
func (hs *HTTPServer) listRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
// Get query parameters
|
||||
name := r.URL.Query().Get("name")
|
||||
version := r.URL.Query().Get("version")
|
||||
|
||||
firmwareList, err := hs.registryClient.ListFirmware()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to list registry firmware")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to list firmware", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Filter by name and version if provided
|
||||
if name != "" || version != "" {
|
||||
filtered := []registry.GroupedFirmware{}
|
||||
for _, group := range firmwareList {
|
||||
if name != "" && group.Name != name {
|
||||
continue
|
||||
}
|
||||
|
||||
filteredFirmware := []registry.FirmwareRecord{}
|
||||
for _, firmware := range group.Firmware {
|
||||
if version != "" && firmware.Version != version {
|
||||
continue
|
||||
}
|
||||
filteredFirmware = append(filteredFirmware, firmware)
|
||||
}
|
||||
|
||||
if len(filteredFirmware) > 0 {
|
||||
group.Firmware = filteredFirmware
|
||||
filtered = append(filtered, group)
|
||||
}
|
||||
}
|
||||
firmwareList = filtered
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(firmwareList)
|
||||
}
|
||||
|
||||
// POST /api/registry/firmware
|
||||
func (hs *HTTPServer) uploadRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
// Parse multipart form
|
||||
err := r.ParseMultipartForm(32 << 20) // 32MB max
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to parse multipart form")
|
||||
http.Error(w, `{"error": "Invalid form data", "message": "Failed to parse multipart form"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Get metadata from form
|
||||
metadataJSON := r.FormValue("metadata")
|
||||
if metadataJSON == "" {
|
||||
http.Error(w, `{"error": "Missing metadata", "message": "Metadata field is required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var metadata registry.FirmwareMetadata
|
||||
if err := json.Unmarshal([]byte(metadataJSON), &metadata); err != nil {
|
||||
log.WithError(err).Error("Invalid metadata JSON")
|
||||
http.Error(w, `{"error": "Invalid metadata", "message": "Failed to parse metadata JSON"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Get firmware file
|
||||
file, _, err := r.FormFile("firmware")
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Missing firmware file")
|
||||
http.Error(w, `{"error": "Missing firmware file", "message": "Firmware file is required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Upload to registry
|
||||
result, err := hs.registryClient.UploadFirmware(metadata, file)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to upload firmware to registry")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Upload failed", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
// GET /api/registry/firmware/{name}/{version}
|
||||
func (hs *HTTPServer) downloadRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
name := vars["name"]
|
||||
version := vars["version"]
|
||||
|
||||
if name == "" || version == "" {
|
||||
http.Error(w, `{"error": "Missing parameters", "message": "Name and version are required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
firmwareData, err := hs.registryClient.DownloadFirmware(name, version)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to download firmware from registry")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Download failed", "message": "%s"}`, err.Error()), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// Set appropriate headers for file download
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s-%s.bin\"", name, version))
|
||||
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(firmwareData)))
|
||||
|
||||
w.Write(firmwareData)
|
||||
}
|
||||
|
||||
// PUT /api/registry/firmware/{name}/{version}
|
||||
func (hs *HTTPServer) updateRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
name := vars["name"]
|
||||
version := vars["version"]
|
||||
|
||||
if name == "" || version == "" {
|
||||
http.Error(w, `{"error": "Missing parameters", "message": "Name and version are required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var metadata registry.FirmwareMetadata
|
||||
if err := json.NewDecoder(r.Body).Decode(&metadata); err != nil {
|
||||
log.WithError(err).Error("Invalid metadata JSON")
|
||||
http.Error(w, `{"error": "Invalid metadata", "message": "Failed to parse metadata JSON"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Update firmware metadata in registry
|
||||
result, err := hs.registryClient.UpdateFirmwareMetadata(name, version, metadata)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to update firmware metadata in registry")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Update failed", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user