Files
goplt/internal/di/providers.go
0x1d b02c1d44c8 fix(consul): Fix health checks for gRPC services in Docker
- Add gRPC health check support to Consul registry
  - Services are gRPC-only, not HTTP
  - Consul was trying HTTP health checks which failed
  - Now uses gRPC health checks via grpc.health.v1.Health service

- Update HealthCheckConfig to support both HTTP and gRPC
  - Add GRPC field for gRPC service name
  - Add UseGRPC flag to choose health check type
  - Default to gRPC for services (use_grpc: true in config)

- Fix service address registration in Docker
  - Services now register with Docker service name (e.g., auth-service)
  - Allows Consul to reach services via Docker network DNS
  - Falls back to localhost for local development

- Update default.yaml to enable gRPC health checks
  - Set use_grpc: true
  - Set grpc: grpc.health.v1.Health

This fixes services being deregistered from Consul due to failed
HTTP health checks. Services will now pass gRPC health checks.
2025-11-06 21:17:33 +01:00

457 lines
13 KiB
Go

package di
import (
"context"
"fmt"
"net/http"
"os"
"time"
"git.dcentral.systems/toolz/goplt/internal/client"
configimpl "git.dcentral.systems/toolz/goplt/internal/config"
errorbusimpl "git.dcentral.systems/toolz/goplt/internal/errorbus"
"git.dcentral.systems/toolz/goplt/internal/health"
"git.dcentral.systems/toolz/goplt/internal/infra/database"
loggerimpl "git.dcentral.systems/toolz/goplt/internal/logger"
"git.dcentral.systems/toolz/goplt/internal/metrics"
"git.dcentral.systems/toolz/goplt/internal/observability"
"git.dcentral.systems/toolz/goplt/internal/registry/consul"
"git.dcentral.systems/toolz/goplt/internal/server"
"git.dcentral.systems/toolz/goplt/pkg/config"
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
"git.dcentral.systems/toolz/goplt/pkg/logger"
"git.dcentral.systems/toolz/goplt/pkg/registry"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/fx"
)
// ProvideConfig creates an FX option that provides ConfigProvider.
func ProvideConfig() fx.Option {
return fx.Provide(func() (config.ConfigProvider, error) {
// Determine environment from environment variable or default to "development"
env := os.Getenv("ENVIRONMENT")
if env == "" {
env = "development"
}
cfg, err := configimpl.LoadConfig(env)
if err != nil {
return nil, fmt.Errorf("failed to load config: %w", err)
}
return cfg, nil
})
}
// ProvideLogger creates an FX option that provides Logger.
func ProvideLogger() fx.Option {
return fx.Provide(func(cfg config.ConfigProvider) (logger.Logger, error) {
level := cfg.GetString("logging.level")
if level == "" {
level = "info"
}
format := cfg.GetString("logging.format")
if format == "" {
format = "json"
}
log, err := loggerimpl.NewZapLogger(level, format)
if err != nil {
return nil, fmt.Errorf("failed to create logger: %w", err)
}
// Set as global logger
logger.SetGlobalLogger(log)
return log, nil
})
}
// ProvideDatabase creates an FX option that provides the database client.
func ProvideDatabase() fx.Option {
return fx.Provide(func(cfg config.ConfigProvider, log logger.Logger, lc fx.Lifecycle) (*database.Client, error) {
dsn := cfg.GetString("database.dsn")
if dsn == "" {
log.Error("ProvideDatabase: DSN is empty")
return nil, fmt.Errorf("database DSN is not configured")
}
maxConns := cfg.GetInt("database.max_connections")
if maxConns == 0 {
maxConns = 25
}
maxIdleConns := cfg.GetInt("database.max_idle_connections")
if maxIdleConns == 0 {
maxIdleConns = 5
}
connMaxLifetime := cfg.GetDuration("database.conn_max_lifetime")
if connMaxLifetime == 0 {
connMaxLifetime = 5 * time.Minute
}
connMaxIdleTime := cfg.GetDuration("database.conn_max_idle_time")
if connMaxIdleTime == 0 {
connMaxIdleTime = 10 * time.Minute
}
log.Info("Preparing database connection",
logger.String("dsn_mask", maskDSN(dsn)),
logger.Int("max_connections", maxConns),
)
log.Info("Connecting to database...")
dbClient, err := database.NewClient(database.Config{
DSN: dsn,
MaxConnections: maxConns,
MaxIdleConns: maxIdleConns,
ConnMaxLifetime: connMaxLifetime,
ConnMaxIdleTime: connMaxIdleTime,
})
if err != nil {
log.Error("Failed to create database client",
logger.Error(err),
)
return nil, fmt.Errorf("failed to create database client: %w", err)
}
log.Info("Database client created successfully")
// Register lifecycle hooks
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
log.Info("Running database migrations...")
// Run migrations on startup
if err := dbClient.Migrate(ctx); err != nil {
log.Error("Database migrations failed",
logger.Error(err),
)
return fmt.Errorf("failed to run database migrations: %w", err)
}
log.Info("Database migrations completed successfully")
return nil
},
OnStop: func(_ context.Context) error {
return dbClient.Close()
},
})
return dbClient, nil
})
}
// ProvideErrorBus creates an FX option that provides the error bus.
func ProvideErrorBus() fx.Option {
return fx.Provide(func(log logger.Logger, lc fx.Lifecycle) (errorbus.ErrorPublisher, error) {
bufferSize := 100 // Can be made configurable
bus := errorbusimpl.NewChannelBus(log, bufferSize)
// Register lifecycle hook to close the bus on shutdown
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return bus.Close()
},
})
return bus, nil
})
}
// ProvideHealthRegistry creates an FX option that provides the health check registry.
// Note: Database health checkers are registered by services that create their own database clients.
func ProvideHealthRegistry() fx.Option {
return fx.Provide(func() (*health.Registry, error) {
registry := health.NewRegistry()
// Services will register their own health checkers (e.g., database, external dependencies)
return registry, nil
})
}
// ProvideMetrics creates an FX option that provides the Prometheus metrics registry.
func ProvideMetrics() fx.Option {
return fx.Provide(func() *metrics.Metrics {
return metrics.NewMetrics()
})
}
// ProvideServiceRegistry creates an FX option that provides the service registry.
func ProvideServiceRegistry() fx.Option {
return fx.Provide(func(cfg config.ConfigProvider) (registry.ServiceRegistry, error) {
registryType := cfg.GetString("registry.type")
if registryType == "" {
registryType = "consul"
}
switch registryType {
case "consul":
consulCfg := consul.Config{
Address: cfg.GetString("registry.consul.address"),
Datacenter: cfg.GetString("registry.consul.datacenter"),
Scheme: cfg.GetString("registry.consul.scheme"),
}
// Set defaults
if consulCfg.Address == "" {
consulCfg.Address = "localhost:8500"
}
if consulCfg.Datacenter == "" {
consulCfg.Datacenter = "dc1"
}
if consulCfg.Scheme == "" {
consulCfg.Scheme = "http"
}
// Parse health check configuration
healthCheckInterval := cfg.GetDuration("registry.consul.health_check.interval")
if healthCheckInterval == 0 {
healthCheckInterval = 10 * time.Second
}
healthCheckTimeout := cfg.GetDuration("registry.consul.health_check.timeout")
if healthCheckTimeout == 0 {
healthCheckTimeout = 3 * time.Second
}
healthCheckDeregisterAfter := cfg.GetDuration("registry.consul.health_check.deregister_after")
if healthCheckDeregisterAfter == 0 {
healthCheckDeregisterAfter = 30 * time.Second
}
healthCheckHTTP := cfg.GetString("registry.consul.health_check.http")
healthCheckGRPC := cfg.GetString("registry.consul.health_check.grpc")
useGRPC := cfg.GetBool("registry.consul.health_check.use_grpc")
// Default to gRPC if not explicitly set (services are gRPC by default)
if !cfg.IsSet("registry.consul.health_check.use_grpc") {
useGRPC = true
}
if healthCheckGRPC == "" {
healthCheckGRPC = "grpc.health.v1.Health"
}
consulCfg.HealthCheck = consul.HealthCheckConfig{
Interval: healthCheckInterval,
Timeout: healthCheckTimeout,
DeregisterAfter: healthCheckDeregisterAfter,
HTTP: healthCheckHTTP,
GRPC: healthCheckGRPC,
UseGRPC: useGRPC,
}
return consul.NewRegistry(consulCfg)
default:
return nil, fmt.Errorf("unsupported registry type: %s", registryType)
}
})
}
// ProvideServiceClientFactory creates an FX option that provides the service client factory.
func ProvideServiceClientFactory() fx.Option {
return fx.Provide(func(reg registry.ServiceRegistry) (*client.ServiceClientFactory, error) {
return client.NewServiceClientFactory(reg), nil
})
}
// ProvideTracer creates an FX option that provides the OpenTelemetry tracer.
func ProvideTracer() fx.Option {
return fx.Provide(func(cfg config.ConfigProvider, lc fx.Lifecycle) (trace.TracerProvider, error) {
enabled := cfg.GetBool("tracing.enabled")
if !enabled {
// Return no-op tracer
return noop.NewTracerProvider(), nil
}
serviceName := cfg.GetString("tracing.service_name")
if serviceName == "" {
serviceName = "platform"
}
serviceVersion := cfg.GetString("tracing.service_version")
if serviceVersion == "" {
serviceVersion = "1.0.0"
}
env := cfg.GetString("environment")
if env == "" {
env = "development"
}
otlpEndpoint := cfg.GetString("tracing.otlp_endpoint")
tp, err := observability.InitTracer(context.Background(), observability.Config{
Enabled: enabled,
ServiceName: serviceName,
ServiceVersion: serviceVersion,
Environment: env,
OTLPEndpoint: otlpEndpoint,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize tracer: %w", err)
}
// Register lifecycle hook to shutdown tracer
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return observability.ShutdownTracer(ctx, tp)
},
})
return tp, nil
})
}
// ProvideHTTPServer creates an FX option that provides the HTTP server.
func ProvideHTTPServer() fx.Option {
return fx.Provide(func(
cfg config.ConfigProvider,
log logger.Logger,
healthRegistry *health.Registry,
metricsRegistry *metrics.Metrics,
errorBus errorbus.ErrorPublisher,
tracer trace.TracerProvider,
lc fx.Lifecycle,
) (*server.Server, error) {
log.Info("Creating HTTP server...")
srv, err := server.NewServer(cfg, log, healthRegistry, metricsRegistry, errorBus, tracer)
if err != nil {
log.Error("Failed to create HTTP server",
logger.Error(err),
)
return nil, fmt.Errorf("failed to create HTTP server: %w", err)
}
log.Info("HTTP server created, registering lifecycle hooks...")
// Register lifecycle hooks
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
// Get server address from config
port := cfg.GetInt("server.port")
if port == 0 {
port = 8080
}
host := cfg.GetString("server.host")
if host == "" {
host = "0.0.0.0"
}
addr := fmt.Sprintf("%s:%d", host, port)
log.Info("HTTP server starting",
logger.String("addr", addr),
)
// Start server in a goroutine
// ListenAndServe blocks, so we need to start it async
// If there's an immediate error (like port in use), it will return quickly
errChan := make(chan error, 1)
go func() {
if err := srv.Start(); err != nil && err != http.ErrServerClosed {
log.Error("HTTP server failed",
logger.String("error", err.Error()),
)
select {
case errChan <- err:
default:
}
}
}()
// Wait a short time to detect immediate binding errors
// If ListenAndServe fails immediately (e.g., port in use), it will return quickly
select {
case err := <-errChan:
return fmt.Errorf("HTTP server failed to start: %w", err)
case <-time.After(500 * time.Millisecond):
// If no error after 500ms, verify server is actually listening
// by attempting a connection
client := &http.Client{Timeout: 1 * time.Second}
checkURL := fmt.Sprintf("http://localhost:%d/healthz", port)
resp, err := client.Get(checkURL)
if err != nil {
// Server might still be starting, but log the attempt
log.Warn("Could not verify HTTP server is listening (may still be starting)",
logger.String("url", checkURL),
logger.String("error", err.Error()),
)
// Continue anyway - server might still be starting
} else {
_ = resp.Body.Close()
}
log.Info("HTTP server started successfully",
logger.String("addr", addr),
)
return nil
}
},
OnStop: func(ctx context.Context) error {
return srv.Shutdown(ctx)
},
})
return srv, nil
})
}
// CoreModule returns an FX option that provides all core kernel infrastructure services.
// This includes configuration, logging, error bus, health checks, metrics, tracing, service registry, and service client factory.
// Note: Database and HTTP server are NOT included - services will create their own instances.
// HTTP server foundation is available via server.NewServer() for services to use.
func CoreModule() fx.Option {
return fx.Options(
ProvideConfig(),
ProvideLogger(),
ProvideErrorBus(),
ProvideHealthRegistry(),
ProvideMetrics(),
ProvideTracer(),
ProvideServiceRegistry(),
ProvideServiceClientFactory(),
// Note: ProvideDatabase() and ProvideHTTPServer() are removed - services create their own
)
}
// maskDSN masks sensitive information in DSN for logging.
func maskDSN(dsn string) string {
// Simple masking: replace password with ***
// Format: postgres://user:password@host:port/db
if len(dsn) < 20 {
return "***"
}
// Find @ symbol and replace password part
if idx := indexOf(dsn, '@'); idx > 0 {
if colonIdx := indexOf(dsn[:idx], ':'); colonIdx > 0 {
return dsn[:colonIdx+1] + "***" + dsn[idx:]
}
}
return "***"
}
// indexOf finds the index of a character in a string.
func indexOf(s string, c byte) int {
for i := 0; i < len(s); i++ {
if s[i] == c {
return i
}
}
return -1
}
// RegisterLifecycleHooks registers lifecycle hooks for logging.
func RegisterLifecycleHooks(lc fx.Lifecycle, l logger.Logger) {
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
l.Info("Application starting",
logger.String("component", "bootstrap"),
)
return nil
},
OnStop: func(_ context.Context) error {
l.Info("Application shutting down",
logger.String("component", "bootstrap"),
)
return nil
},
})
}