Files
goplt/internal/registry/consul/consul.go
0x1d 988adf6cc5
Some checks failed
CI / Test (pull_request) Failing after 50s
CI / Lint (pull_request) Failing after 32s
CI / Build (pull_request) Successful in 17s
CI / Format Check (pull_request) Failing after 2s
Fix gRPC health checks and add API Gateway Consul registration
- Fix gRPC health checks: Set serving status for default service (empty string) in all services
  - Consul checks the default service by default, not specific service names
  - All services now set both default and specific service status to SERVING

- Update Consul registration logic to automatically detect HTTP vs gRPC services
  - HTTP services (API Gateway) use HTTP health checks
  - gRPC services use gRPC health checks
  - Detection based on service tags and metadata

- Add API Gateway Consul registration
  - Register with Docker service name in Docker environment
  - Use HTTP health checks for API Gateway
  - Proper host/port configuration handling

- Add API Gateway HTTP-to-gRPC handlers
  - Implement service-specific handlers for Auth and Identity services
  - Translate HTTP requests to gRPC calls
  - Map gRPC error codes to HTTP status codes
2025-11-06 22:04:55 +01:00

245 lines
8.0 KiB
Go

// Package consul provides Consul-based service registry implementation.
package consul
import (
"context"
"fmt"
"time"
"git.dcentral.systems/toolz/goplt/pkg/registry"
consulapi "github.com/hashicorp/consul/api"
)
// ConsulRegistry implements ServiceRegistry using Consul.
type ConsulRegistry struct {
client *consulapi.Client
config *Config
}
// Config holds Consul registry configuration.
type Config struct {
Address string // Consul agent address (e.g., "localhost:8500")
Datacenter string // Consul datacenter
Scheme string // "http" or "https"
HealthCheck HealthCheckConfig
}
// HealthCheckConfig holds health check configuration.
type HealthCheckConfig struct {
Interval time.Duration // Health check interval
Timeout time.Duration // Health check timeout
DeregisterAfter time.Duration // Time to wait before deregistering unhealthy service
HTTP string // HTTP health check endpoint (e.g., "/healthz") - for HTTP services
GRPC string // gRPC health check service name (e.g., "grpc.health.v1.Health") - for gRPC services
UseGRPC bool // Whether to use gRPC health checks instead of HTTP
}
// NewRegistry creates a new Consul-based service registry.
func NewRegistry(cfg Config) (*ConsulRegistry, error) {
consulConfig := consulapi.DefaultConfig()
if cfg.Address != "" {
consulConfig.Address = cfg.Address
}
if cfg.Datacenter != "" {
consulConfig.Datacenter = cfg.Datacenter
}
if cfg.Scheme != "" {
consulConfig.Scheme = cfg.Scheme
}
client, err := consulapi.NewClient(consulConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Consul client: %w", err)
}
return &ConsulRegistry{
client: client,
config: &cfg,
}, nil
}
// Register registers a service instance with Consul.
func (r *ConsulRegistry) Register(ctx context.Context, service *registry.ServiceInstance) error {
registration := &consulapi.AgentServiceRegistration{
ID: service.ID,
Name: service.Name,
Address: service.Address,
Port: service.Port,
Tags: service.Tags,
Meta: service.Metadata,
}
// Determine health check type based on service metadata/tags or config
// Check if service is HTTP (has "http" tag or protocol metadata)
isHTTP := false
for _, tag := range service.Tags {
if tag == "http" {
isHTTP = true
break
}
}
if !isHTTP && service.Metadata != nil {
if protocol, ok := service.Metadata["protocol"]; ok && protocol == "http" {
isHTTP = true
}
}
// Add health check if configured
if isHTTP && r.config.HealthCheck.HTTP != "" {
// Use HTTP health check for HTTP services (e.g., API Gateway)
healthCheckURL := fmt.Sprintf("http://%s:%d%s", service.Address, service.Port, r.config.HealthCheck.HTTP)
registration.Check = &consulapi.AgentServiceCheck{
HTTP: healthCheckURL,
Interval: r.config.HealthCheck.Interval.String(),
Timeout: r.config.HealthCheck.Timeout.String(),
DeregisterCriticalServiceAfter: r.config.HealthCheck.DeregisterAfter.String(),
}
} else if !isHTTP && r.config.HealthCheck.UseGRPC {
// Use gRPC health check for gRPC services
// Format: host:port (checks default service with empty string name)
// Or: host:port/service (checks specific service name)
// We use host:port to check the default service (empty string)
grpcAddr := fmt.Sprintf("%s:%d", service.Address, service.Port)
// If a specific service name is provided, append it
// Otherwise, check the default service (empty string) which we set in each service
if r.config.HealthCheck.GRPC != "" && r.config.HealthCheck.GRPC != "grpc.health.v1.Health" {
// Only append if it's not the default health service name
// The GRPC field in Consul expects the application service name, not the proto service name
grpcAddr = fmt.Sprintf("%s:%d/%s", service.Address, service.Port, r.config.HealthCheck.GRPC)
}
registration.Check = &consulapi.AgentServiceCheck{
GRPC: grpcAddr,
Interval: r.config.HealthCheck.Interval.String(),
Timeout: r.config.HealthCheck.Timeout.String(),
DeregisterCriticalServiceAfter: r.config.HealthCheck.DeregisterAfter.String(),
}
} else if r.config.HealthCheck.HTTP != "" {
// Fallback to HTTP if HTTP endpoint is configured and service is not explicitly gRPC
healthCheckURL := fmt.Sprintf("http://%s:%d%s", service.Address, service.Port, r.config.HealthCheck.HTTP)
registration.Check = &consulapi.AgentServiceCheck{
HTTP: healthCheckURL,
Interval: r.config.HealthCheck.Interval.String(),
Timeout: r.config.HealthCheck.Timeout.String(),
DeregisterCriticalServiceAfter: r.config.HealthCheck.DeregisterAfter.String(),
}
}
return r.client.Agent().ServiceRegister(registration)
}
// Deregister removes a service instance from Consul.
func (r *ConsulRegistry) Deregister(ctx context.Context, serviceID string) error {
return r.client.Agent().ServiceDeregister(serviceID)
}
// Discover returns all healthy instances of a service.
func (r *ConsulRegistry) Discover(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
services, _, err := r.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, fmt.Errorf("failed to discover service %s: %w", serviceName, err)
}
instances := make([]*registry.ServiceInstance, 0, len(services))
for _, service := range services {
instances = append(instances, &registry.ServiceInstance{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
Metadata: service.Service.Meta,
})
}
return instances, nil
}
// Watch returns a channel that receives updates when service instances change.
func (r *ConsulRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*registry.ServiceInstance, error) {
updates := make(chan []*registry.ServiceInstance, 10)
go func() {
defer close(updates)
lastIndex := uint64(0)
for {
select {
case <-ctx.Done():
return
default:
services, meta, err := r.client.Health().Service(serviceName, "", true, &consulapi.QueryOptions{
WaitIndex: lastIndex,
WaitTime: 10 * time.Second,
})
if err != nil {
// Log error and continue
continue
}
if meta.LastIndex != lastIndex {
instances := make([]*registry.ServiceInstance, 0, len(services))
for _, service := range services {
instances = append(instances, &registry.ServiceInstance{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
Metadata: service.Service.Meta,
})
}
select {
case updates <- instances:
case <-ctx.Done():
return
}
lastIndex = meta.LastIndex
}
}
}
}()
return updates, nil
}
// Health returns the health status of a service instance.
func (r *ConsulRegistry) Health(ctx context.Context, serviceID string) (*registry.HealthStatus, error) {
entries, _, err := r.client.Health().Service(serviceID, "", false, nil)
if err != nil {
return nil, fmt.Errorf("failed to get health for service %s: %w", serviceID, err)
}
if len(entries) == 0 {
return &registry.HealthStatus{
ServiceID: serviceID,
Status: "unknown",
Message: "service not found",
}, nil
}
// Check health status from service entry checks
status := "healthy"
message := "all checks passing"
// Get the first entry (should be the service instance)
entry := entries[0]
for _, check := range entry.Checks {
if check.Status == consulapi.HealthCritical {
status = "critical"
message = check.Output
break
} else if check.Status == consulapi.HealthWarning {
status = "unhealthy"
message = check.Output
}
}
return &registry.HealthStatus{
ServiceID: serviceID,
Status: status,
Message: message,
}, nil
}