- Add gRPC health check support to Consul registry - Services are gRPC-only, not HTTP - Consul was trying HTTP health checks which failed - Now uses gRPC health checks via grpc.health.v1.Health service - Update HealthCheckConfig to support both HTTP and gRPC - Add GRPC field for gRPC service name - Add UseGRPC flag to choose health check type - Default to gRPC for services (use_grpc: true in config) - Fix service address registration in Docker - Services now register with Docker service name (e.g., auth-service) - Allows Consul to reach services via Docker network DNS - Falls back to localhost for local development - Update default.yaml to enable gRPC health checks - Set use_grpc: true - Set grpc: grpc.health.v1.Health This fixes services being deregistered from Consul due to failed HTTP health checks. Services will now pass gRPC health checks.
457 lines
13 KiB
Go
457 lines
13 KiB
Go
package di
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"git.dcentral.systems/toolz/goplt/internal/client"
|
|
configimpl "git.dcentral.systems/toolz/goplt/internal/config"
|
|
errorbusimpl "git.dcentral.systems/toolz/goplt/internal/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/internal/health"
|
|
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
|
loggerimpl "git.dcentral.systems/toolz/goplt/internal/logger"
|
|
"git.dcentral.systems/toolz/goplt/internal/metrics"
|
|
"git.dcentral.systems/toolz/goplt/internal/observability"
|
|
"git.dcentral.systems/toolz/goplt/internal/registry/consul"
|
|
"git.dcentral.systems/toolz/goplt/internal/server"
|
|
"git.dcentral.systems/toolz/goplt/pkg/config"
|
|
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
|
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.opentelemetry.io/otel/trace/noop"
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
// ProvideConfig creates an FX option that provides ConfigProvider.
|
|
func ProvideConfig() fx.Option {
|
|
return fx.Provide(func() (config.ConfigProvider, error) {
|
|
// Determine environment from environment variable or default to "development"
|
|
env := os.Getenv("ENVIRONMENT")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
cfg, err := configimpl.LoadConfig(env)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
|
|
return cfg, nil
|
|
})
|
|
}
|
|
|
|
// ProvideLogger creates an FX option that provides Logger.
|
|
func ProvideLogger() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider) (logger.Logger, error) {
|
|
level := cfg.GetString("logging.level")
|
|
if level == "" {
|
|
level = "info"
|
|
}
|
|
|
|
format := cfg.GetString("logging.format")
|
|
if format == "" {
|
|
format = "json"
|
|
}
|
|
|
|
log, err := loggerimpl.NewZapLogger(level, format)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create logger: %w", err)
|
|
}
|
|
|
|
// Set as global logger
|
|
logger.SetGlobalLogger(log)
|
|
|
|
return log, nil
|
|
})
|
|
}
|
|
|
|
// ProvideDatabase creates an FX option that provides the database client.
|
|
func ProvideDatabase() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, log logger.Logger, lc fx.Lifecycle) (*database.Client, error) {
|
|
dsn := cfg.GetString("database.dsn")
|
|
if dsn == "" {
|
|
log.Error("ProvideDatabase: DSN is empty")
|
|
return nil, fmt.Errorf("database DSN is not configured")
|
|
}
|
|
|
|
maxConns := cfg.GetInt("database.max_connections")
|
|
if maxConns == 0 {
|
|
maxConns = 25
|
|
}
|
|
|
|
maxIdleConns := cfg.GetInt("database.max_idle_connections")
|
|
if maxIdleConns == 0 {
|
|
maxIdleConns = 5
|
|
}
|
|
|
|
connMaxLifetime := cfg.GetDuration("database.conn_max_lifetime")
|
|
if connMaxLifetime == 0 {
|
|
connMaxLifetime = 5 * time.Minute
|
|
}
|
|
|
|
connMaxIdleTime := cfg.GetDuration("database.conn_max_idle_time")
|
|
if connMaxIdleTime == 0 {
|
|
connMaxIdleTime = 10 * time.Minute
|
|
}
|
|
|
|
log.Info("Preparing database connection",
|
|
logger.String("dsn_mask", maskDSN(dsn)),
|
|
logger.Int("max_connections", maxConns),
|
|
)
|
|
|
|
log.Info("Connecting to database...")
|
|
dbClient, err := database.NewClient(database.Config{
|
|
DSN: dsn,
|
|
MaxConnections: maxConns,
|
|
MaxIdleConns: maxIdleConns,
|
|
ConnMaxLifetime: connMaxLifetime,
|
|
ConnMaxIdleTime: connMaxIdleTime,
|
|
})
|
|
if err != nil {
|
|
log.Error("Failed to create database client",
|
|
logger.Error(err),
|
|
)
|
|
return nil, fmt.Errorf("failed to create database client: %w", err)
|
|
}
|
|
|
|
log.Info("Database client created successfully")
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
log.Info("Running database migrations...")
|
|
// Run migrations on startup
|
|
if err := dbClient.Migrate(ctx); err != nil {
|
|
log.Error("Database migrations failed",
|
|
logger.Error(err),
|
|
)
|
|
return fmt.Errorf("failed to run database migrations: %w", err)
|
|
}
|
|
log.Info("Database migrations completed successfully")
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
return dbClient.Close()
|
|
},
|
|
})
|
|
|
|
return dbClient, nil
|
|
})
|
|
}
|
|
|
|
// ProvideErrorBus creates an FX option that provides the error bus.
|
|
func ProvideErrorBus() fx.Option {
|
|
return fx.Provide(func(log logger.Logger, lc fx.Lifecycle) (errorbus.ErrorPublisher, error) {
|
|
bufferSize := 100 // Can be made configurable
|
|
bus := errorbusimpl.NewChannelBus(log, bufferSize)
|
|
|
|
// Register lifecycle hook to close the bus on shutdown
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error {
|
|
return bus.Close()
|
|
},
|
|
})
|
|
|
|
return bus, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHealthRegistry creates an FX option that provides the health check registry.
|
|
// Note: Database health checkers are registered by services that create their own database clients.
|
|
func ProvideHealthRegistry() fx.Option {
|
|
return fx.Provide(func() (*health.Registry, error) {
|
|
registry := health.NewRegistry()
|
|
// Services will register their own health checkers (e.g., database, external dependencies)
|
|
return registry, nil
|
|
})
|
|
}
|
|
|
|
// ProvideMetrics creates an FX option that provides the Prometheus metrics registry.
|
|
func ProvideMetrics() fx.Option {
|
|
return fx.Provide(func() *metrics.Metrics {
|
|
return metrics.NewMetrics()
|
|
})
|
|
}
|
|
|
|
// ProvideServiceRegistry creates an FX option that provides the service registry.
|
|
func ProvideServiceRegistry() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider) (registry.ServiceRegistry, error) {
|
|
registryType := cfg.GetString("registry.type")
|
|
if registryType == "" {
|
|
registryType = "consul"
|
|
}
|
|
|
|
switch registryType {
|
|
case "consul":
|
|
consulCfg := consul.Config{
|
|
Address: cfg.GetString("registry.consul.address"),
|
|
Datacenter: cfg.GetString("registry.consul.datacenter"),
|
|
Scheme: cfg.GetString("registry.consul.scheme"),
|
|
}
|
|
|
|
// Set defaults
|
|
if consulCfg.Address == "" {
|
|
consulCfg.Address = "localhost:8500"
|
|
}
|
|
if consulCfg.Datacenter == "" {
|
|
consulCfg.Datacenter = "dc1"
|
|
}
|
|
if consulCfg.Scheme == "" {
|
|
consulCfg.Scheme = "http"
|
|
}
|
|
|
|
// Parse health check configuration
|
|
healthCheckInterval := cfg.GetDuration("registry.consul.health_check.interval")
|
|
if healthCheckInterval == 0 {
|
|
healthCheckInterval = 10 * time.Second
|
|
}
|
|
healthCheckTimeout := cfg.GetDuration("registry.consul.health_check.timeout")
|
|
if healthCheckTimeout == 0 {
|
|
healthCheckTimeout = 3 * time.Second
|
|
}
|
|
healthCheckDeregisterAfter := cfg.GetDuration("registry.consul.health_check.deregister_after")
|
|
if healthCheckDeregisterAfter == 0 {
|
|
healthCheckDeregisterAfter = 30 * time.Second
|
|
}
|
|
healthCheckHTTP := cfg.GetString("registry.consul.health_check.http")
|
|
healthCheckGRPC := cfg.GetString("registry.consul.health_check.grpc")
|
|
useGRPC := cfg.GetBool("registry.consul.health_check.use_grpc")
|
|
// Default to gRPC if not explicitly set (services are gRPC by default)
|
|
if !cfg.IsSet("registry.consul.health_check.use_grpc") {
|
|
useGRPC = true
|
|
}
|
|
if healthCheckGRPC == "" {
|
|
healthCheckGRPC = "grpc.health.v1.Health"
|
|
}
|
|
|
|
consulCfg.HealthCheck = consul.HealthCheckConfig{
|
|
Interval: healthCheckInterval,
|
|
Timeout: healthCheckTimeout,
|
|
DeregisterAfter: healthCheckDeregisterAfter,
|
|
HTTP: healthCheckHTTP,
|
|
GRPC: healthCheckGRPC,
|
|
UseGRPC: useGRPC,
|
|
}
|
|
|
|
return consul.NewRegistry(consulCfg)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported registry type: %s", registryType)
|
|
}
|
|
})
|
|
}
|
|
|
|
// ProvideServiceClientFactory creates an FX option that provides the service client factory.
|
|
func ProvideServiceClientFactory() fx.Option {
|
|
return fx.Provide(func(reg registry.ServiceRegistry) (*client.ServiceClientFactory, error) {
|
|
return client.NewServiceClientFactory(reg), nil
|
|
})
|
|
}
|
|
|
|
// ProvideTracer creates an FX option that provides the OpenTelemetry tracer.
|
|
func ProvideTracer() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, lc fx.Lifecycle) (trace.TracerProvider, error) {
|
|
enabled := cfg.GetBool("tracing.enabled")
|
|
if !enabled {
|
|
// Return no-op tracer
|
|
return noop.NewTracerProvider(), nil
|
|
}
|
|
|
|
serviceName := cfg.GetString("tracing.service_name")
|
|
if serviceName == "" {
|
|
serviceName = "platform"
|
|
}
|
|
|
|
serviceVersion := cfg.GetString("tracing.service_version")
|
|
if serviceVersion == "" {
|
|
serviceVersion = "1.0.0"
|
|
}
|
|
|
|
env := cfg.GetString("environment")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
otlpEndpoint := cfg.GetString("tracing.otlp_endpoint")
|
|
|
|
tp, err := observability.InitTracer(context.Background(), observability.Config{
|
|
Enabled: enabled,
|
|
ServiceName: serviceName,
|
|
ServiceVersion: serviceVersion,
|
|
Environment: env,
|
|
OTLPEndpoint: otlpEndpoint,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize tracer: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hook to shutdown tracer
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return observability.ShutdownTracer(ctx, tp)
|
|
},
|
|
})
|
|
|
|
return tp, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHTTPServer creates an FX option that provides the HTTP server.
|
|
func ProvideHTTPServer() fx.Option {
|
|
return fx.Provide(func(
|
|
cfg config.ConfigProvider,
|
|
log logger.Logger,
|
|
healthRegistry *health.Registry,
|
|
metricsRegistry *metrics.Metrics,
|
|
errorBus errorbus.ErrorPublisher,
|
|
tracer trace.TracerProvider,
|
|
lc fx.Lifecycle,
|
|
) (*server.Server, error) {
|
|
log.Info("Creating HTTP server...")
|
|
|
|
srv, err := server.NewServer(cfg, log, healthRegistry, metricsRegistry, errorBus, tracer)
|
|
if err != nil {
|
|
log.Error("Failed to create HTTP server",
|
|
logger.Error(err),
|
|
)
|
|
return nil, fmt.Errorf("failed to create HTTP server: %w", err)
|
|
}
|
|
|
|
log.Info("HTTP server created, registering lifecycle hooks...")
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
// Get server address from config
|
|
port := cfg.GetInt("server.port")
|
|
if port == 0 {
|
|
port = 8080
|
|
}
|
|
host := cfg.GetString("server.host")
|
|
if host == "" {
|
|
host = "0.0.0.0"
|
|
}
|
|
addr := fmt.Sprintf("%s:%d", host, port)
|
|
|
|
log.Info("HTTP server starting",
|
|
logger.String("addr", addr),
|
|
)
|
|
|
|
// Start server in a goroutine
|
|
// ListenAndServe blocks, so we need to start it async
|
|
// If there's an immediate error (like port in use), it will return quickly
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
if err := srv.Start(); err != nil && err != http.ErrServerClosed {
|
|
log.Error("HTTP server failed",
|
|
logger.String("error", err.Error()),
|
|
)
|
|
select {
|
|
case errChan <- err:
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait a short time to detect immediate binding errors
|
|
// If ListenAndServe fails immediately (e.g., port in use), it will return quickly
|
|
select {
|
|
case err := <-errChan:
|
|
return fmt.Errorf("HTTP server failed to start: %w", err)
|
|
case <-time.After(500 * time.Millisecond):
|
|
// If no error after 500ms, verify server is actually listening
|
|
// by attempting a connection
|
|
client := &http.Client{Timeout: 1 * time.Second}
|
|
checkURL := fmt.Sprintf("http://localhost:%d/healthz", port)
|
|
resp, err := client.Get(checkURL)
|
|
if err != nil {
|
|
// Server might still be starting, but log the attempt
|
|
log.Warn("Could not verify HTTP server is listening (may still be starting)",
|
|
logger.String("url", checkURL),
|
|
logger.String("error", err.Error()),
|
|
)
|
|
// Continue anyway - server might still be starting
|
|
} else {
|
|
_ = resp.Body.Close()
|
|
}
|
|
|
|
log.Info("HTTP server started successfully",
|
|
logger.String("addr", addr),
|
|
)
|
|
return nil
|
|
}
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return srv.Shutdown(ctx)
|
|
},
|
|
})
|
|
|
|
return srv, nil
|
|
})
|
|
}
|
|
|
|
// CoreModule returns an FX option that provides all core kernel infrastructure services.
|
|
// This includes configuration, logging, error bus, health checks, metrics, tracing, service registry, and service client factory.
|
|
// Note: Database and HTTP server are NOT included - services will create their own instances.
|
|
// HTTP server foundation is available via server.NewServer() for services to use.
|
|
func CoreModule() fx.Option {
|
|
return fx.Options(
|
|
ProvideConfig(),
|
|
ProvideLogger(),
|
|
ProvideErrorBus(),
|
|
ProvideHealthRegistry(),
|
|
ProvideMetrics(),
|
|
ProvideTracer(),
|
|
ProvideServiceRegistry(),
|
|
ProvideServiceClientFactory(),
|
|
// Note: ProvideDatabase() and ProvideHTTPServer() are removed - services create their own
|
|
)
|
|
}
|
|
|
|
// maskDSN masks sensitive information in DSN for logging.
|
|
func maskDSN(dsn string) string {
|
|
// Simple masking: replace password with ***
|
|
// Format: postgres://user:password@host:port/db
|
|
if len(dsn) < 20 {
|
|
return "***"
|
|
}
|
|
// Find @ symbol and replace password part
|
|
if idx := indexOf(dsn, '@'); idx > 0 {
|
|
if colonIdx := indexOf(dsn[:idx], ':'); colonIdx > 0 {
|
|
return dsn[:colonIdx+1] + "***" + dsn[idx:]
|
|
}
|
|
}
|
|
return "***"
|
|
}
|
|
|
|
// indexOf finds the index of a character in a string.
|
|
func indexOf(s string, c byte) int {
|
|
for i := 0; i < len(s); i++ {
|
|
if s[i] == c {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// RegisterLifecycleHooks registers lifecycle hooks for logging.
|
|
func RegisterLifecycleHooks(lc fx.Lifecycle, l logger.Logger) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
l.Info("Application starting",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
l.Info("Application shutting down",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
})
|
|
}
|