- Add fx.Invoke in main.go to force database and HTTP server creation - This ensures all providers execute and their lifecycle hooks are registered - Clean up debug logging statements - Database migrations and HTTP server now start correctly on application startup Fixes issue where database migrations and HTTP server were not starting because FX providers were not being executed (lazy evaluation).
379 lines
10 KiB
Go
379 lines
10 KiB
Go
package di
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
configimpl "git.dcentral.systems/toolz/goplt/internal/config"
|
|
errorbusimpl "git.dcentral.systems/toolz/goplt/internal/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/internal/health"
|
|
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
|
loggerimpl "git.dcentral.systems/toolz/goplt/internal/logger"
|
|
"git.dcentral.systems/toolz/goplt/internal/metrics"
|
|
"git.dcentral.systems/toolz/goplt/internal/observability"
|
|
"git.dcentral.systems/toolz/goplt/internal/server"
|
|
"git.dcentral.systems/toolz/goplt/pkg/config"
|
|
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
// ProvideConfig creates an FX option that provides ConfigProvider.
|
|
func ProvideConfig() fx.Option {
|
|
return fx.Provide(func() (config.ConfigProvider, error) {
|
|
// Determine environment from environment variable or default to "development"
|
|
env := os.Getenv("ENVIRONMENT")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
cfg, err := configimpl.LoadConfig(env)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
|
|
return cfg, nil
|
|
})
|
|
}
|
|
|
|
// ProvideLogger creates an FX option that provides Logger.
|
|
func ProvideLogger() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider) (logger.Logger, error) {
|
|
level := cfg.GetString("logging.level")
|
|
if level == "" {
|
|
level = "info"
|
|
}
|
|
|
|
format := cfg.GetString("logging.format")
|
|
if format == "" {
|
|
format = "json"
|
|
}
|
|
|
|
log, err := loggerimpl.NewZapLogger(level, format)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create logger: %w", err)
|
|
}
|
|
|
|
// Set as global logger
|
|
logger.SetGlobalLogger(log)
|
|
|
|
return log, nil
|
|
})
|
|
}
|
|
|
|
// ProvideDatabase creates an FX option that provides the database client.
|
|
func ProvideDatabase() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, log logger.Logger, lc fx.Lifecycle) (*database.Client, error) {
|
|
dsn := cfg.GetString("database.dsn")
|
|
if dsn == "" {
|
|
log.Error("ProvideDatabase: DSN is empty")
|
|
return nil, fmt.Errorf("database DSN is not configured")
|
|
}
|
|
|
|
maxConns := cfg.GetInt("database.max_connections")
|
|
if maxConns == 0 {
|
|
maxConns = 25
|
|
}
|
|
|
|
maxIdleConns := cfg.GetInt("database.max_idle_connections")
|
|
if maxIdleConns == 0 {
|
|
maxIdleConns = 5
|
|
}
|
|
|
|
connMaxLifetime := cfg.GetDuration("database.conn_max_lifetime")
|
|
if connMaxLifetime == 0 {
|
|
connMaxLifetime = 5 * time.Minute
|
|
}
|
|
|
|
connMaxIdleTime := cfg.GetDuration("database.conn_max_idle_time")
|
|
if connMaxIdleTime == 0 {
|
|
connMaxIdleTime = 10 * time.Minute
|
|
}
|
|
|
|
log.Info("Preparing database connection",
|
|
logger.String("dsn_mask", maskDSN(dsn)),
|
|
logger.Int("max_connections", maxConns),
|
|
)
|
|
|
|
log.Info("Connecting to database...")
|
|
dbClient, err := database.NewClient(database.Config{
|
|
DSN: dsn,
|
|
MaxConnections: maxConns,
|
|
MaxIdleConns: maxIdleConns,
|
|
ConnMaxLifetime: connMaxLifetime,
|
|
ConnMaxIdleTime: connMaxIdleTime,
|
|
})
|
|
if err != nil {
|
|
log.Error("Failed to create database client",
|
|
logger.Error(err),
|
|
)
|
|
return nil, fmt.Errorf("failed to create database client: %w", err)
|
|
}
|
|
|
|
log.Info("Database client created successfully")
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
log.Info("Running database migrations...")
|
|
// Run migrations on startup
|
|
if err := dbClient.Migrate(ctx); err != nil {
|
|
log.Error("Database migrations failed",
|
|
logger.Error(err),
|
|
)
|
|
return fmt.Errorf("failed to run database migrations: %w", err)
|
|
}
|
|
log.Info("Database migrations completed successfully")
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return dbClient.Close()
|
|
},
|
|
})
|
|
|
|
return dbClient, nil
|
|
})
|
|
}
|
|
|
|
// ProvideErrorBus creates an FX option that provides the error bus.
|
|
func ProvideErrorBus() fx.Option {
|
|
return fx.Provide(func(log logger.Logger, lc fx.Lifecycle) (errorbus.ErrorPublisher, error) {
|
|
bufferSize := 100 // Can be made configurable
|
|
bus := errorbusimpl.NewChannelBus(log, bufferSize)
|
|
|
|
// Register lifecycle hook to close the bus on shutdown
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return bus.Close()
|
|
},
|
|
})
|
|
|
|
return bus, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHealthRegistry creates an FX option that provides the health check registry.
|
|
func ProvideHealthRegistry() fx.Option {
|
|
return fx.Provide(func(dbClient *database.Client) (*health.Registry, error) {
|
|
registry := health.NewRegistry()
|
|
|
|
// Register database health checker
|
|
registry.Register("database", health.NewDatabaseChecker(dbClient))
|
|
|
|
return registry, nil
|
|
})
|
|
}
|
|
|
|
// ProvideMetrics creates an FX option that provides the Prometheus metrics registry.
|
|
func ProvideMetrics() fx.Option {
|
|
return fx.Provide(func() *metrics.Metrics {
|
|
return metrics.NewMetrics()
|
|
})
|
|
}
|
|
|
|
// ProvideTracer creates an FX option that provides the OpenTelemetry tracer.
|
|
func ProvideTracer() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, lc fx.Lifecycle) (trace.TracerProvider, error) {
|
|
enabled := cfg.GetBool("tracing.enabled")
|
|
if !enabled {
|
|
// Return no-op tracer
|
|
return trace.NewNoopTracerProvider(), nil
|
|
}
|
|
|
|
serviceName := cfg.GetString("tracing.service_name")
|
|
if serviceName == "" {
|
|
serviceName = "platform"
|
|
}
|
|
|
|
serviceVersion := cfg.GetString("tracing.service_version")
|
|
if serviceVersion == "" {
|
|
serviceVersion = "1.0.0"
|
|
}
|
|
|
|
env := cfg.GetString("environment")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
otlpEndpoint := cfg.GetString("tracing.otlp_endpoint")
|
|
|
|
tp, err := observability.InitTracer(context.Background(), observability.Config{
|
|
Enabled: enabled,
|
|
ServiceName: serviceName,
|
|
ServiceVersion: serviceVersion,
|
|
Environment: env,
|
|
OTLPEndpoint: otlpEndpoint,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize tracer: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hook to shutdown tracer
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return observability.ShutdownTracer(ctx, tp)
|
|
},
|
|
})
|
|
|
|
return tp, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHTTPServer creates an FX option that provides the HTTP server.
|
|
func ProvideHTTPServer() fx.Option {
|
|
return fx.Provide(func(
|
|
cfg config.ConfigProvider,
|
|
log logger.Logger,
|
|
healthRegistry *health.Registry,
|
|
metricsRegistry *metrics.Metrics,
|
|
errorBus errorbus.ErrorPublisher,
|
|
tracer trace.TracerProvider,
|
|
lc fx.Lifecycle,
|
|
) (*server.Server, error) {
|
|
log.Info("Creating HTTP server...")
|
|
|
|
srv, err := server.NewServer(cfg, log, healthRegistry, metricsRegistry, errorBus, tracer)
|
|
if err != nil {
|
|
log.Error("Failed to create HTTP server",
|
|
logger.Error(err),
|
|
)
|
|
return nil, fmt.Errorf("failed to create HTTP server: %w", err)
|
|
}
|
|
|
|
log.Info("HTTP server created, registering lifecycle hooks...")
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
|
|
// Get server address from config
|
|
port := cfg.GetInt("server.port")
|
|
if port == 0 {
|
|
port = 8080
|
|
}
|
|
host := cfg.GetString("server.host")
|
|
if host == "" {
|
|
host = "0.0.0.0"
|
|
}
|
|
addr := fmt.Sprintf("%s:%d", host, port)
|
|
|
|
log.Info("HTTP server starting",
|
|
logger.String("addr", addr),
|
|
)
|
|
|
|
// Start server in a goroutine
|
|
// ListenAndServe blocks, so we need to start it async
|
|
// If there's an immediate error (like port in use), it will return quickly
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
if err := srv.Start(); err != nil && err != http.ErrServerClosed {
|
|
log.Error("HTTP server failed",
|
|
logger.String("error", err.Error()),
|
|
)
|
|
select {
|
|
case errChan <- err:
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait a short time to detect immediate binding errors
|
|
// If ListenAndServe fails immediately (e.g., port in use), it will return quickly
|
|
select {
|
|
case err := <-errChan:
|
|
return fmt.Errorf("HTTP server failed to start: %w", err)
|
|
case <-time.After(500 * time.Millisecond):
|
|
// If no error after 500ms, verify server is actually listening
|
|
// by attempting a connection
|
|
client := &http.Client{Timeout: 1 * time.Second}
|
|
checkURL := fmt.Sprintf("http://localhost:%d/healthz", port)
|
|
resp, err := client.Get(checkURL)
|
|
if err != nil {
|
|
// Server might still be starting, but log the attempt
|
|
log.Warn("Could not verify HTTP server is listening (may still be starting)",
|
|
logger.String("url", checkURL),
|
|
logger.String("error", err.Error()),
|
|
)
|
|
// Continue anyway - server might still be starting
|
|
} else {
|
|
resp.Body.Close()
|
|
}
|
|
|
|
log.Info("HTTP server started successfully",
|
|
logger.String("addr", addr),
|
|
)
|
|
return nil
|
|
}
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return srv.Shutdown(ctx)
|
|
},
|
|
})
|
|
|
|
return srv, nil
|
|
})
|
|
}
|
|
|
|
// CoreModule returns an FX option that provides all core services.
|
|
// This includes configuration, logging, database, error bus, health checks, metrics, tracing, and HTTP server.
|
|
func CoreModule() fx.Option {
|
|
return fx.Options(
|
|
ProvideConfig(),
|
|
ProvideLogger(),
|
|
ProvideDatabase(),
|
|
ProvideErrorBus(),
|
|
ProvideHealthRegistry(),
|
|
ProvideMetrics(),
|
|
ProvideTracer(),
|
|
ProvideHTTPServer(),
|
|
)
|
|
}
|
|
|
|
// maskDSN masks sensitive information in DSN for logging.
|
|
func maskDSN(dsn string) string {
|
|
// Simple masking: replace password with ***
|
|
// Format: postgres://user:password@host:port/db
|
|
if len(dsn) < 20 {
|
|
return "***"
|
|
}
|
|
// Find @ symbol and replace password part
|
|
if idx := indexOf(dsn, '@'); idx > 0 {
|
|
if colonIdx := indexOf(dsn[:idx], ':'); colonIdx > 0 {
|
|
return dsn[:colonIdx+1] + "***" + dsn[idx:]
|
|
}
|
|
}
|
|
return "***"
|
|
}
|
|
|
|
// indexOf finds the index of a character in a string.
|
|
func indexOf(s string, c byte) int {
|
|
for i := 0; i < len(s); i++ {
|
|
if s[i] == c {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// RegisterLifecycleHooks registers lifecycle hooks for logging.
|
|
func RegisterLifecycleHooks(lc fx.Lifecycle, l logger.Logger) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
l.Info("Application starting",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
l.Info("Application shutting down",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
})
|
|
}
|