mirror of
https://github.com/0x1d/rcond.git
synced 2025-12-14 18:25:21 +01:00
feat: introduce cluster events
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/0x1d/rcond/pkg/config"
|
||||
"github.com/hashicorp/logutils"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
@@ -11,8 +14,22 @@ type Agent struct {
|
||||
Serf *serf.Serf
|
||||
}
|
||||
|
||||
func NewAgent(clusterConfig *config.ClusterConfig) (*Agent, error) {
|
||||
// ClusterEvent represents a custom event that will be sent to the Serf cluster
|
||||
type ClusterEvent struct {
|
||||
Name string
|
||||
Data []byte
|
||||
}
|
||||
|
||||
func NewAgent(clusterConfig *config.ClusterConfig, clusterEvents map[string]func([]byte)) (*Agent, error) {
|
||||
config := serf.DefaultConfig()
|
||||
config.Init()
|
||||
logFilter := &logutils.LevelFilter{
|
||||
Levels: []logutils.LogLevel{"DEBUG", "INFO", "WARN", "ERROR"},
|
||||
MinLevel: logutils.LogLevel(clusterConfig.LogLevel),
|
||||
Writer: os.Stderr,
|
||||
}
|
||||
config.LogOutput = logFilter
|
||||
config.MemberlistConfig.LogOutput = logFilter
|
||||
config.NodeName = clusterConfig.NodeName
|
||||
config.ProtocolVersion = serf.ProtocolVersionMax
|
||||
config.MemberlistConfig.SecretKey = []byte(clusterConfig.SecretKey)
|
||||
@@ -21,6 +38,12 @@ func NewAgent(clusterConfig *config.ClusterConfig) (*Agent, error) {
|
||||
config.MemberlistConfig.BindAddr = clusterConfig.BindAddr
|
||||
config.MemberlistConfig.BindPort = clusterConfig.BindPort
|
||||
|
||||
// Setup event channel
|
||||
eventCh := make(chan serf.Event, 10)
|
||||
config.EventCh = eventCh
|
||||
go handleEvents(eventCh, clusterEvents)
|
||||
|
||||
// Start Serf
|
||||
serf, err := serf.Create(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -29,6 +52,19 @@ func NewAgent(clusterConfig *config.ClusterConfig) (*Agent, error) {
|
||||
return &Agent{Serf: serf}, nil
|
||||
}
|
||||
|
||||
// Event sends a custom event to the Serf cluster.
|
||||
// It marshals the provided ClusterEvent into JSON and then uses Serf's UserEvent method to send the event.
|
||||
func (a *Agent) Event(event ClusterEvent) error {
|
||||
eventData, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.Serf.UserEvent(event.Name, eventData, false); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) Members() ([]serf.Member, error) {
|
||||
log.Printf("Getting members of the cluster")
|
||||
return a.Serf.Members(), nil
|
||||
@@ -53,3 +89,21 @@ func (a *Agent) Shutdown() error {
|
||||
log.Printf("Shutting down cluster agent")
|
||||
return a.Serf.Shutdown()
|
||||
}
|
||||
|
||||
// handleEvents handles Serf events received on the event channel
|
||||
func handleEvents(eventCh chan serf.Event, clusterEvents map[string]func([]byte)) {
|
||||
for event := range eventCh {
|
||||
switch event.EventType() {
|
||||
case serf.EventUser:
|
||||
userEvent := event.(serf.UserEvent)
|
||||
eventHandlers := clusterEvents
|
||||
if handler, ok := eventHandlers[userEvent.Name]; ok {
|
||||
handler(userEvent.Payload)
|
||||
} else {
|
||||
log.Printf("No event handler found for event: %s", userEvent.Name)
|
||||
}
|
||||
default:
|
||||
log.Printf("Received event: %s\n", event.EventType())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
34
pkg/cluster/events.go
Normal file
34
pkg/cluster/events.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/0x1d/rcond/pkg/network"
|
||||
"github.com/0x1d/rcond/pkg/system"
|
||||
)
|
||||
|
||||
func ClusterEventsMap() map[string]func([]byte) {
|
||||
return map[string]func([]byte){
|
||||
"printHostname": printHostname,
|
||||
"restart": restart,
|
||||
"shutdown": shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
func restart(payload []byte) {
|
||||
if err := system.Restart(); err != nil {
|
||||
log.Printf("(ClusterEvent:restart) failed: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func shutdown(payload []byte) {
|
||||
if err := system.Shutdown(); err != nil {
|
||||
log.Printf("(ClusterEvent:shutdown) failed: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// just a sample function to test event functionality
|
||||
func printHostname(payload []byte) {
|
||||
hostname, _ := network.GetHostname()
|
||||
log.Printf("(ClusterEvent:printHostname): %s", hostname)
|
||||
}
|
||||
@@ -25,6 +25,7 @@ type ClusterConfig struct {
|
||||
AdvertisePort int `yaml:"advertise_port"`
|
||||
BindAddr string `yaml:"bind_addr"`
|
||||
BindPort int `yaml:"bind_port"`
|
||||
LogLevel string `yaml:"log_level"`
|
||||
}
|
||||
|
||||
func LoadConfig(path string) (*Config, error) {
|
||||
|
||||
@@ -41,6 +41,11 @@ type authorizedKeyRequest struct {
|
||||
PubKey string `json:"pubkey"`
|
||||
}
|
||||
|
||||
type clusterEventRequest struct {
|
||||
Name string `json:"name"`
|
||||
Payload string `json:"payload,omitempty"`
|
||||
}
|
||||
|
||||
type errorResponse struct {
|
||||
Error string `json:"error"`
|
||||
}
|
||||
@@ -52,11 +57,6 @@ func writeError(w http.ResponseWriter, message string, code int) {
|
||||
}
|
||||
|
||||
func HandleConfigureSTA(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req configureSTARequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, err.Error(), http.StatusBadRequest)
|
||||
@@ -76,11 +76,6 @@ func HandleConfigureSTA(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleConfigureAP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req configureAPRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, err.Error(), http.StatusBadRequest)
|
||||
@@ -111,11 +106,6 @@ func HandleConfigureAP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleNetworkUp(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPut {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req networkUpRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, err.Error(), http.StatusBadRequest)
|
||||
@@ -137,11 +127,6 @@ func HandleNetworkUp(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleNetworkDown(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodDelete {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
iface := vars["interface"]
|
||||
if err := network.Down(iface); err != nil {
|
||||
@@ -154,11 +139,6 @@ func HandleNetworkDown(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleNetworkRemove(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodDelete {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
uuid := vars["uuid"]
|
||||
if err := network.Remove(uuid); err != nil {
|
||||
@@ -182,11 +162,6 @@ func HandleGetHostname(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleSetHostname(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req setHostnameRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, err.Error(), http.StatusBadRequest)
|
||||
@@ -203,11 +178,6 @@ func HandleSetHostname(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleAddAuthorizedKey(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
var req authorizedKeyRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, err.Error(), http.StatusBadRequest)
|
||||
@@ -227,11 +197,6 @@ func HandleAddAuthorizedKey(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleRemoveAuthorizedKey(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodDelete {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
fingerprint := vars["fingerprint"]
|
||||
if fingerprint == "" {
|
||||
@@ -261,11 +226,6 @@ func HandleRemoveAuthorizedKey(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleReboot(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
if err := system.Restart(); err != nil {
|
||||
writeError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
@@ -276,11 +236,6 @@ func HandleReboot(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func HandleShutdown(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
if err := system.Shutdown(); err != nil {
|
||||
writeError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
@@ -347,3 +302,23 @@ func HandleClusterMembers(w http.ResponseWriter, r *http.Request, agent *cluster
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(members)
|
||||
}
|
||||
|
||||
func HandleClusterEvent(w http.ResponseWriter, r *http.Request, agent *cluster.Agent) {
|
||||
var req clusterEventRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
event := cluster.ClusterEvent{
|
||||
Name: req.Name,
|
||||
Data: []byte(req.Payload),
|
||||
}
|
||||
err := agent.Event(event)
|
||||
if err != nil {
|
||||
writeError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "success"})
|
||||
}
|
||||
|
||||
@@ -79,6 +79,7 @@ func (s *Server) RegisterRoutes() {
|
||||
s.router.HandleFunc("/cluster/members", s.verifyToken(ClusterAgentHandler(s.clusterAgent, HandleClusterMembers))).Methods(http.MethodGet)
|
||||
s.router.HandleFunc("/cluster/join", s.verifyToken(ClusterAgentHandler(s.clusterAgent, HandleClusterJoin))).Methods(http.MethodPost)
|
||||
s.router.HandleFunc("/cluster/leave", s.verifyToken(ClusterAgentHandler(s.clusterAgent, HandleClusterLeave))).Methods(http.MethodPost)
|
||||
s.router.HandleFunc("/cluster/event", s.verifyToken(ClusterAgentHandler(s.clusterAgent, HandleClusterEvent))).Methods(http.MethodPost)
|
||||
}
|
||||
|
||||
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@@ -13,7 +13,7 @@ func Restart() error {
|
||||
log.Println("Rebooting system...")
|
||||
call := obj.Call("org.freedesktop.systemd1.Manager.Reboot", 0)
|
||||
if call.Err != nil {
|
||||
log.Fatal(call.Err)
|
||||
return call.Err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@@ -26,7 +26,7 @@ func Shutdown() error {
|
||||
log.Println("Shutting down system...")
|
||||
call := obj.Call("org.freedesktop.systemd1.Manager.PowerOff", 0)
|
||||
if call.Err != nil {
|
||||
log.Fatal(call.Err)
|
||||
return call.Err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user