// Package consul provides Consul-based service registry implementation. package consul import ( "context" "fmt" "time" "git.dcentral.systems/toolz/goplt/pkg/registry" consulapi "github.com/hashicorp/consul/api" ) // ConsulRegistry implements ServiceRegistry using Consul. type ConsulRegistry struct { client *consulapi.Client config *Config } // Config holds Consul registry configuration. type Config struct { Address string // Consul agent address (e.g., "localhost:8500") Datacenter string // Consul datacenter Scheme string // "http" or "https" HealthCheck HealthCheckConfig } // HealthCheckConfig holds health check configuration. type HealthCheckConfig struct { Interval time.Duration // Health check interval Timeout time.Duration // Health check timeout DeregisterAfter time.Duration // Time to wait before deregistering unhealthy service HTTP string // HTTP health check endpoint (e.g., "/healthz") - for HTTP services GRPC string // gRPC health check service name (e.g., "grpc.health.v1.Health") - for gRPC services UseGRPC bool // Whether to use gRPC health checks instead of HTTP } // NewRegistry creates a new Consul-based service registry. func NewRegistry(cfg Config) (*ConsulRegistry, error) { consulConfig := consulapi.DefaultConfig() if cfg.Address != "" { consulConfig.Address = cfg.Address } if cfg.Datacenter != "" { consulConfig.Datacenter = cfg.Datacenter } if cfg.Scheme != "" { consulConfig.Scheme = cfg.Scheme } client, err := consulapi.NewClient(consulConfig) if err != nil { return nil, fmt.Errorf("failed to create Consul client: %w", err) } return &ConsulRegistry{ client: client, config: &cfg, }, nil } // Register registers a service instance with Consul. func (r *ConsulRegistry) Register(ctx context.Context, service *registry.ServiceInstance) error { registration := &consulapi.AgentServiceRegistration{ ID: service.ID, Name: service.Name, Address: service.Address, Port: service.Port, Tags: service.Tags, Meta: service.Metadata, } // Determine health check type based on service metadata/tags or config // Check if service is HTTP (has "http" tag or protocol metadata) isHTTP := false for _, tag := range service.Tags { if tag == "http" { isHTTP = true break } } if !isHTTP && service.Metadata != nil { if protocol, ok := service.Metadata["protocol"]; ok && protocol == "http" { isHTTP = true } } // Add health check if configured if isHTTP && r.config.HealthCheck.HTTP != "" { // Use HTTP health check for HTTP services (e.g., API Gateway) healthCheckURL := fmt.Sprintf("http://%s:%d%s", service.Address, service.Port, r.config.HealthCheck.HTTP) registration.Check = &consulapi.AgentServiceCheck{ HTTP: healthCheckURL, Interval: r.config.HealthCheck.Interval.String(), Timeout: r.config.HealthCheck.Timeout.String(), DeregisterCriticalServiceAfter: r.config.HealthCheck.DeregisterAfter.String(), } } else if !isHTTP && r.config.HealthCheck.UseGRPC { // Use gRPC health check for gRPC services // Format: host:port (checks default service with empty string name) // Or: host:port/service (checks specific service name) // We use host:port to check the default service (empty string) grpcAddr := fmt.Sprintf("%s:%d", service.Address, service.Port) // If a specific service name is provided, append it // Otherwise, check the default service (empty string) which we set in each service if r.config.HealthCheck.GRPC != "" && r.config.HealthCheck.GRPC != "grpc.health.v1.Health" { // Only append if it's not the default health service name // The GRPC field in Consul expects the application service name, not the proto service name grpcAddr = fmt.Sprintf("%s:%d/%s", service.Address, service.Port, r.config.HealthCheck.GRPC) } registration.Check = &consulapi.AgentServiceCheck{ GRPC: grpcAddr, Interval: r.config.HealthCheck.Interval.String(), Timeout: r.config.HealthCheck.Timeout.String(), DeregisterCriticalServiceAfter: r.config.HealthCheck.DeregisterAfter.String(), } } else if r.config.HealthCheck.HTTP != "" { // Fallback to HTTP if HTTP endpoint is configured and service is not explicitly gRPC healthCheckURL := fmt.Sprintf("http://%s:%d%s", service.Address, service.Port, r.config.HealthCheck.HTTP) registration.Check = &consulapi.AgentServiceCheck{ HTTP: healthCheckURL, Interval: r.config.HealthCheck.Interval.String(), Timeout: r.config.HealthCheck.Timeout.String(), DeregisterCriticalServiceAfter: r.config.HealthCheck.DeregisterAfter.String(), } } return r.client.Agent().ServiceRegister(registration) } // Deregister removes a service instance from Consul. func (r *ConsulRegistry) Deregister(ctx context.Context, serviceID string) error { return r.client.Agent().ServiceDeregister(serviceID) } // Discover returns all healthy instances of a service. func (r *ConsulRegistry) Discover(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) { services, _, err := r.client.Health().Service(serviceName, "", true, nil) if err != nil { return nil, fmt.Errorf("failed to discover service %s: %w", serviceName, err) } instances := make([]*registry.ServiceInstance, 0, len(services)) for _, service := range services { instances = append(instances, ®istry.ServiceInstance{ ID: service.Service.ID, Name: service.Service.Service, Address: service.Service.Address, Port: service.Service.Port, Tags: service.Service.Tags, Metadata: service.Service.Meta, }) } return instances, nil } // Watch returns a channel that receives updates when service instances change. func (r *ConsulRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*registry.ServiceInstance, error) { updates := make(chan []*registry.ServiceInstance, 10) go func() { defer close(updates) lastIndex := uint64(0) for { select { case <-ctx.Done(): return default: services, meta, err := r.client.Health().Service(serviceName, "", true, &consulapi.QueryOptions{ WaitIndex: lastIndex, WaitTime: 10 * time.Second, }) if err != nil { // Log error and continue continue } if meta.LastIndex != lastIndex { instances := make([]*registry.ServiceInstance, 0, len(services)) for _, service := range services { instances = append(instances, ®istry.ServiceInstance{ ID: service.Service.ID, Name: service.Service.Service, Address: service.Service.Address, Port: service.Service.Port, Tags: service.Service.Tags, Metadata: service.Service.Meta, }) } select { case updates <- instances: case <-ctx.Done(): return } lastIndex = meta.LastIndex } } } }() return updates, nil } // Health returns the health status of a service instance. func (r *ConsulRegistry) Health(ctx context.Context, serviceID string) (*registry.HealthStatus, error) { entries, _, err := r.client.Health().Service(serviceID, "", false, nil) if err != nil { return nil, fmt.Errorf("failed to get health for service %s: %w", serviceID, err) } if len(entries) == 0 { return ®istry.HealthStatus{ ServiceID: serviceID, Status: "unknown", Message: "service not found", }, nil } // Check health status from service entry checks status := "healthy" message := "all checks passing" // Get the first entry (should be the service instance) entry := entries[0] for _, check := range entry.Checks { if check.Status == consulapi.HealthCritical { status = "critical" message = check.Output break } else if check.Status == consulapi.HealthWarning { status = "unhealthy" message = check.Output } } return ®istry.HealthStatus{ ServiceID: serviceID, Status: status, Message: message, }, nil }