diff --git a/README.md b/README.md index 919f375..f93ed57 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,7 @@ All endpoints except `/health` require authentication via an API token passed in | GET | `/cluster/members` | Get the cluster members | | POST | `/cluster/join` | Join cluster nodes | | POST | `/cluster/leave` | Leave the cluster | +| POST | `/cluster/event` | Send a cluster event | ### Response Codes @@ -144,6 +145,25 @@ All endpoints except `/health` require authentication via an API token passed in ### Request/Response Format All endpoints use JSON for request and response payloads. +## Cluster Events + +Cluster events are used for broadcast messages to all nodes in the cluster. They are sent as HTTP POST requests to the `/cluster/event` endpoint. + +The request body should be a JSON object with the following fields: +- `name`: The name of the event +- `payload`: The payload of the event, optional + +The response will be a JSON object with the following fields: +- `status`: The status of the event. This is a string, either "success" or "error". +- `error`: If the status is "error", this field will contain a string describing the error. This field is optional. + +Following events are implemented: + +| Event Name | Description | Payload | +|------------|----------------------|---------| +| restart | Restart the cluster | N/A | +| shutdown | Shutdown the cluster | N/A | + ## Examples ### Connect to a WiFi Access Point diff --git a/api/rcond.yaml b/api/rcond.yaml index 587d80d..4f62889 100644 --- a/api/rcond.yaml +++ b/api/rcond.yaml @@ -668,3 +668,46 @@ paths: application/json: schema: $ref: '#/components/schemas/Error' + /cluster/event: + post: + summary: Send a cluster event + description: Send a cluster event to all nodes in the cluster + requestBody: + description: Cluster event details + content: + application/json: + schema: + type: object + properties: + name: + description: Event name + type: string + example: "printHostname" + payload: + description: Event payload + type: string + example: "blabla" + responses: + '200': + description: Event sent successfully + content: + application/json: + schema: + type: object + properties: + status: + description: Indicates if the event was sent successfully + type: string + example: "success" + '400': + description: Bad request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' diff --git a/cmd/rcond/main.go b/cmd/rcond/main.go index e375801..d6c562b 100644 --- a/cmd/rcond/main.go +++ b/cmd/rcond/main.go @@ -94,7 +94,7 @@ func startClusterAgent(appConfig *config.Config) *cluster.Agent { clusterConfig := &appConfig.Cluster if clusterConfig.Enabled { log.Printf("Starting cluster agent on %s:%d", clusterConfig.BindAddr, clusterConfig.BindPort) - clusterAgent, err := cluster.NewAgent(clusterConfig) + clusterAgent, err := cluster.NewAgent(clusterConfig, cluster.ClusterEventsMap()) if err != nil { log.Fatal(err) } @@ -102,12 +102,6 @@ func startClusterAgent(appConfig *config.Config) *cluster.Agent { if len(clusterConfig.Join) > 0 { clusterAgent.Join(clusterConfig.Join, true) } - // get members in the cluster - members, err := clusterAgent.Members() - if err != nil { - log.Fatal(err) - } - log.Printf("Members: %v", members) return clusterAgent } return nil diff --git a/config/rcond-agent.yaml b/config/rcond-agent.yaml index 4ec9f93..f9b4202 100644 --- a/config/rcond-agent.yaml +++ b/config/rcond-agent.yaml @@ -5,11 +5,12 @@ rcond: cluster: enabled: true + log_level: INFO node_name: rcond-agent secret_key: DMXnaJUUbIBMj1Df0dPsQY+Sks1VxWTa advertise_addr: 0.0.0.0 advertise_port: 7947 bind_addr: 0.0.0.0 bind_port: 7947 -# join: -# - 127.0.0.1:7946 \ No newline at end of file + join: + - 127.0.0.1:7946 \ No newline at end of file diff --git a/config/rcond.yaml b/config/rcond.yaml index 5dfd0f0..9d048d4 100644 --- a/config/rcond.yaml +++ b/config/rcond.yaml @@ -7,6 +7,8 @@ rcond: cluster: # Enable the cluster agent enabled: true + # Log level + log_level: INFO # Name of the node in the cluster node_name: rcond # Secret key for the cluster agent used for message encryption. diff --git a/go.mod b/go.mod index df238ef..98528f2 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/godbus/dbus/v5 v5.1.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 + github.com/hashicorp/logutils v1.0.0 github.com/hashicorp/serf v0.10.2 github.com/stretchr/testify v1.10.0 golang.org/x/crypto v0.37.0 diff --git a/go.sum b/go.sum index 1c88615..da7c50e 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= +github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/memberlist v0.5.2 h1:rJoNPWZ0juJBgqn48gjy59K5H4rNgvUoM1kUD7bXiuI= github.com/hashicorp/memberlist v0.5.2/go.mod h1:Ri9p/tRShbjYnpNf4FFPXG7wxEGY4Nrcn6E7jrVa//4= github.com/hashicorp/serf v0.10.2 h1:m5IORhuNSjaxeljg5DeQVDlQyVkhRIjJDimbkCa8aAc= diff --git a/pkg/cluster/agent.go b/pkg/cluster/agent.go index 8a7b4fd..09fafb5 100644 --- a/pkg/cluster/agent.go +++ b/pkg/cluster/agent.go @@ -1,9 +1,12 @@ package cluster import ( + "encoding/json" "log" + "os" "github.com/0x1d/rcond/pkg/config" + "github.com/hashicorp/logutils" "github.com/hashicorp/serf/serf" ) @@ -11,8 +14,22 @@ type Agent struct { Serf *serf.Serf } -func NewAgent(clusterConfig *config.ClusterConfig) (*Agent, error) { +// ClusterEvent represents a custom event that will be sent to the Serf cluster +type ClusterEvent struct { + Name string + Data []byte +} + +func NewAgent(clusterConfig *config.ClusterConfig, clusterEvents map[string]func([]byte)) (*Agent, error) { config := serf.DefaultConfig() + config.Init() + logFilter := &logutils.LevelFilter{ + Levels: []logutils.LogLevel{"DEBUG", "INFO", "WARN", "ERROR"}, + MinLevel: logutils.LogLevel(clusterConfig.LogLevel), + Writer: os.Stderr, + } + config.LogOutput = logFilter + config.MemberlistConfig.LogOutput = logFilter config.NodeName = clusterConfig.NodeName config.ProtocolVersion = serf.ProtocolVersionMax config.MemberlistConfig.SecretKey = []byte(clusterConfig.SecretKey) @@ -21,6 +38,12 @@ func NewAgent(clusterConfig *config.ClusterConfig) (*Agent, error) { config.MemberlistConfig.BindAddr = clusterConfig.BindAddr config.MemberlistConfig.BindPort = clusterConfig.BindPort + // Setup event channel + eventCh := make(chan serf.Event, 10) + config.EventCh = eventCh + go handleEvents(eventCh, clusterEvents) + + // Start Serf serf, err := serf.Create(config) if err != nil { return nil, err @@ -29,6 +52,19 @@ func NewAgent(clusterConfig *config.ClusterConfig) (*Agent, error) { return &Agent{Serf: serf}, nil } +// Event sends a custom event to the Serf cluster. +// It marshals the provided ClusterEvent into JSON and then uses Serf's UserEvent method to send the event. +func (a *Agent) Event(event ClusterEvent) error { + eventData, err := json.Marshal(event) + if err != nil { + return err + } + if err := a.Serf.UserEvent(event.Name, eventData, false); err != nil { + return err + } + return nil +} + func (a *Agent) Members() ([]serf.Member, error) { log.Printf("Getting members of the cluster") return a.Serf.Members(), nil @@ -53,3 +89,21 @@ func (a *Agent) Shutdown() error { log.Printf("Shutting down cluster agent") return a.Serf.Shutdown() } + +// handleEvents handles Serf events received on the event channel +func handleEvents(eventCh chan serf.Event, clusterEvents map[string]func([]byte)) { + for event := range eventCh { + switch event.EventType() { + case serf.EventUser: + userEvent := event.(serf.UserEvent) + eventHandlers := clusterEvents + if handler, ok := eventHandlers[userEvent.Name]; ok { + handler(userEvent.Payload) + } else { + log.Printf("No event handler found for event: %s", userEvent.Name) + } + default: + log.Printf("Received event: %s\n", event.EventType()) + } + } +} diff --git a/pkg/cluster/events.go b/pkg/cluster/events.go new file mode 100644 index 0000000..c9e9077 --- /dev/null +++ b/pkg/cluster/events.go @@ -0,0 +1,34 @@ +package cluster + +import ( + "log" + + "github.com/0x1d/rcond/pkg/network" + "github.com/0x1d/rcond/pkg/system" +) + +func ClusterEventsMap() map[string]func([]byte) { + return map[string]func([]byte){ + "printHostname": printHostname, + "restart": restart, + "shutdown": shutdown, + } +} + +func restart(payload []byte) { + if err := system.Restart(); err != nil { + log.Printf("(ClusterEvent:restart) failed: %s", err) + } +} + +func shutdown(payload []byte) { + if err := system.Shutdown(); err != nil { + log.Printf("(ClusterEvent:shutdown) failed: %s", err) + } +} + +// just a sample function to test event functionality +func printHostname(payload []byte) { + hostname, _ := network.GetHostname() + log.Printf("(ClusterEvent:printHostname): %s", hostname) +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 437dd4c..9036bd7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -25,6 +25,7 @@ type ClusterConfig struct { AdvertisePort int `yaml:"advertise_port"` BindAddr string `yaml:"bind_addr"` BindPort int `yaml:"bind_port"` + LogLevel string `yaml:"log_level"` } func LoadConfig(path string) (*Config, error) { diff --git a/pkg/http/handlers.go b/pkg/http/handlers.go index 8651207..59116cc 100644 --- a/pkg/http/handlers.go +++ b/pkg/http/handlers.go @@ -41,6 +41,11 @@ type authorizedKeyRequest struct { PubKey string `json:"pubkey"` } +type clusterEventRequest struct { + Name string `json:"name"` + Payload string `json:"payload,omitempty"` +} + type errorResponse struct { Error string `json:"error"` } @@ -52,11 +57,6 @@ func writeError(w http.ResponseWriter, message string, code int) { } func HandleConfigureSTA(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - var req configureSTARequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, err.Error(), http.StatusBadRequest) @@ -76,11 +76,6 @@ func HandleConfigureSTA(w http.ResponseWriter, r *http.Request) { } func HandleConfigureAP(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - var req configureAPRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, err.Error(), http.StatusBadRequest) @@ -111,11 +106,6 @@ func HandleConfigureAP(w http.ResponseWriter, r *http.Request) { } func HandleNetworkUp(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPut { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - var req networkUpRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, err.Error(), http.StatusBadRequest) @@ -137,11 +127,6 @@ func HandleNetworkUp(w http.ResponseWriter, r *http.Request) { } func HandleNetworkDown(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodDelete { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - vars := mux.Vars(r) iface := vars["interface"] if err := network.Down(iface); err != nil { @@ -154,11 +139,6 @@ func HandleNetworkDown(w http.ResponseWriter, r *http.Request) { } func HandleNetworkRemove(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodDelete { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - vars := mux.Vars(r) uuid := vars["uuid"] if err := network.Remove(uuid); err != nil { @@ -182,11 +162,6 @@ func HandleGetHostname(w http.ResponseWriter, r *http.Request) { } func HandleSetHostname(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - var req setHostnameRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, err.Error(), http.StatusBadRequest) @@ -203,11 +178,6 @@ func HandleSetHostname(w http.ResponseWriter, r *http.Request) { } func HandleAddAuthorizedKey(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - var req authorizedKeyRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, err.Error(), http.StatusBadRequest) @@ -227,11 +197,6 @@ func HandleAddAuthorizedKey(w http.ResponseWriter, r *http.Request) { } func HandleRemoveAuthorizedKey(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodDelete { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - vars := mux.Vars(r) fingerprint := vars["fingerprint"] if fingerprint == "" { @@ -261,11 +226,6 @@ func HandleRemoveAuthorizedKey(w http.ResponseWriter, r *http.Request) { } func HandleReboot(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - if err := system.Restart(); err != nil { writeError(w, err.Error(), http.StatusInternalServerError) return @@ -276,11 +236,6 @@ func HandleReboot(w http.ResponseWriter, r *http.Request) { } func HandleShutdown(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - if err := system.Shutdown(); err != nil { writeError(w, err.Error(), http.StatusInternalServerError) return @@ -347,3 +302,23 @@ func HandleClusterMembers(w http.ResponseWriter, r *http.Request, agent *cluster w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(members) } + +func HandleClusterEvent(w http.ResponseWriter, r *http.Request, agent *cluster.Agent) { + var req clusterEventRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, err.Error(), http.StatusBadRequest) + return + } + event := cluster.ClusterEvent{ + Name: req.Name, + Data: []byte(req.Payload), + } + err := agent.Event(event) + if err != nil { + writeError(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "success"}) +} diff --git a/pkg/http/server.go b/pkg/http/server.go index e99fe0d..00a739a 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -79,6 +79,7 @@ func (s *Server) RegisterRoutes() { s.router.HandleFunc("/cluster/members", s.verifyToken(ClusterAgentHandler(s.clusterAgent, HandleClusterMembers))).Methods(http.MethodGet) s.router.HandleFunc("/cluster/join", s.verifyToken(ClusterAgentHandler(s.clusterAgent, HandleClusterJoin))).Methods(http.MethodPost) s.router.HandleFunc("/cluster/leave", s.verifyToken(ClusterAgentHandler(s.clusterAgent, HandleClusterLeave))).Methods(http.MethodPost) + s.router.HandleFunc("/cluster/event", s.verifyToken(ClusterAgentHandler(s.clusterAgent, HandleClusterEvent))).Methods(http.MethodPost) } func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/system/state.go b/pkg/system/state.go index 1d22e60..8de4ff1 100644 --- a/pkg/system/state.go +++ b/pkg/system/state.go @@ -13,7 +13,7 @@ func Restart() error { log.Println("Rebooting system...") call := obj.Call("org.freedesktop.systemd1.Manager.Reboot", 0) if call.Err != nil { - log.Fatal(call.Err) + return call.Err } return nil }) @@ -26,7 +26,7 @@ func Shutdown() error { log.Println("Shutting down system...") call := obj.Call("org.freedesktop.systemd1.Manager.PowerOff", 0) if call.Err != nil { - log.Fatal(call.Err) + return call.Err } return nil })