mirror of
https://github.com/0x1d/rcond.git
synced 2025-12-14 18:25:21 +01:00
133 lines
3.9 KiB
Go
133 lines
3.9 KiB
Go
package cluster
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log"
|
|
"os"
|
|
|
|
"github.com/0x1d/rcond/pkg/config"
|
|
"github.com/hashicorp/logutils"
|
|
"github.com/hashicorp/serf/serf"
|
|
)
|
|
|
|
// Agent represents a Serf cluster agent.
|
|
type Agent struct {
|
|
Serf *serf.Serf
|
|
}
|
|
|
|
// ClusterEvent represents a custom event that will be sent to the Serf cluster.
|
|
type ClusterEvent struct {
|
|
Name string
|
|
Data []byte
|
|
}
|
|
|
|
// NewAgent creates a new Serf cluster agent with the given configuration and event handlers.
|
|
func NewAgent(clusterConfig *config.ClusterConfig, clusterEvents map[string]func([]byte)) (*Agent, error) {
|
|
config := serf.DefaultConfig()
|
|
config.Init()
|
|
logFilter := &logutils.LevelFilter{
|
|
Levels: []logutils.LogLevel{"DEBUG", "INFO", "WARN", "ERROR"},
|
|
MinLevel: logutils.LogLevel(clusterConfig.LogLevel),
|
|
Writer: os.Stderr,
|
|
}
|
|
config.LogOutput = logFilter
|
|
config.MemberlistConfig.LogOutput = logFilter
|
|
config.NodeName = clusterConfig.NodeName
|
|
config.ProtocolVersion = serf.ProtocolVersionMax
|
|
config.MemberlistConfig.SecretKey = []byte(clusterConfig.SecretKey)
|
|
config.MemberlistConfig.AdvertiseAddr = clusterConfig.AdvertiseAddr
|
|
config.MemberlistConfig.AdvertisePort = clusterConfig.AdvertisePort
|
|
config.MemberlistConfig.BindAddr = clusterConfig.BindAddr
|
|
config.MemberlistConfig.BindPort = clusterConfig.BindPort
|
|
|
|
// Setup event channel
|
|
eventCh := make(chan serf.Event, 10)
|
|
config.EventCh = eventCh
|
|
go handleEvents(eventCh, clusterEvents)
|
|
|
|
// Start Serf
|
|
serf, err := serf.Create(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Agent{Serf: serf}, nil
|
|
}
|
|
|
|
func Up(clusterConfig *config.ClusterConfig) (*Agent, error) {
|
|
if clusterConfig.Enabled {
|
|
log.Printf("[INFO] Starting cluster agent on %s:%d", clusterConfig.BindAddr, clusterConfig.BindPort)
|
|
clusterAgent, err := NewAgent(clusterConfig, ClusterEventsMap())
|
|
if err != nil {
|
|
log.Print(err)
|
|
return nil, err
|
|
}
|
|
// join nodes in the cluster if the join addresses are provided
|
|
if len(clusterConfig.Join) > 0 {
|
|
clusterAgent.Join(clusterConfig.Join, true)
|
|
}
|
|
return clusterAgent, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// Event sends a custom event to the Serf cluster.
|
|
// It marshals the provided ClusterEvent into JSON and then uses Serf's UserEvent method to send the event.
|
|
func (a *Agent) Event(event ClusterEvent) error {
|
|
eventData, err := json.Marshal(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := a.Serf.UserEvent(event.Name, eventData, false); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Members returns a list of members in the Serf cluster.
|
|
func (a *Agent) Members() ([]serf.Member, error) {
|
|
log.Printf("[INFO] Getting members of the cluster")
|
|
return a.Serf.Members(), nil
|
|
}
|
|
|
|
// Join attempts to join the Serf cluster with the given addresses, optionally ignoring old nodes.
|
|
func (a *Agent) Join(addrs []string, ignoreOld bool) (int, error) {
|
|
log.Printf("[INFO] Joining nodes in the cluster: %v", addrs)
|
|
n, err := a.Serf.Join(addrs, ignoreOld)
|
|
if err != nil {
|
|
log.Printf("[ERROR] Failed to join nodes in the cluster: %v", err)
|
|
return 0, err
|
|
}
|
|
log.Printf("[INFO] Joined %d nodes in the cluster", n)
|
|
return n, nil
|
|
}
|
|
|
|
// Leave causes the agent to leave the Serf cluster.
|
|
func (a *Agent) Leave() error {
|
|
return a.Serf.Leave()
|
|
}
|
|
|
|
// Shutdown shuts down the Serf cluster agent.
|
|
func (a *Agent) Shutdown() error {
|
|
log.Printf("[INFO] Shutting down cluster agent")
|
|
return a.Serf.Shutdown()
|
|
}
|
|
|
|
// handleEvents handles Serf events received on the event channel.
|
|
func handleEvents(eventCh chan serf.Event, clusterEvents map[string]func([]byte)) {
|
|
eventHandlers := clusterEvents
|
|
for event := range eventCh {
|
|
switch event.EventType() {
|
|
case serf.EventUser:
|
|
userEvent := event.(serf.UserEvent)
|
|
if handler, ok := eventHandlers[userEvent.Name]; ok {
|
|
handler(userEvent.Payload)
|
|
} else {
|
|
log.Printf("[INFO] No event handler found for event: %s", userEvent.Name)
|
|
}
|
|
default:
|
|
log.Printf("[INFO] Received event: %s\n", event.EventType())
|
|
}
|
|
}
|
|
}
|