- Added logging when HTTP server OnStart hook is called - Added error logging for database migration failures - This will help identify if hooks are being called and where failures occur
361 lines
9.7 KiB
Go
361 lines
9.7 KiB
Go
package di
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
configimpl "git.dcentral.systems/toolz/goplt/internal/config"
|
|
errorbusimpl "git.dcentral.systems/toolz/goplt/internal/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/internal/health"
|
|
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
|
loggerimpl "git.dcentral.systems/toolz/goplt/internal/logger"
|
|
"git.dcentral.systems/toolz/goplt/internal/metrics"
|
|
"git.dcentral.systems/toolz/goplt/internal/observability"
|
|
"git.dcentral.systems/toolz/goplt/internal/server"
|
|
"git.dcentral.systems/toolz/goplt/pkg/config"
|
|
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
// ProvideConfig creates an FX option that provides ConfigProvider.
|
|
func ProvideConfig() fx.Option {
|
|
return fx.Provide(func() (config.ConfigProvider, error) {
|
|
// Determine environment from environment variable or default to "development"
|
|
env := os.Getenv("ENVIRONMENT")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
cfg, err := configimpl.LoadConfig(env)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
|
|
return cfg, nil
|
|
})
|
|
}
|
|
|
|
// ProvideLogger creates an FX option that provides Logger.
|
|
func ProvideLogger() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider) (logger.Logger, error) {
|
|
level := cfg.GetString("logging.level")
|
|
if level == "" {
|
|
level = "info"
|
|
}
|
|
|
|
format := cfg.GetString("logging.format")
|
|
if format == "" {
|
|
format = "json"
|
|
}
|
|
|
|
log, err := loggerimpl.NewZapLogger(level, format)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create logger: %w", err)
|
|
}
|
|
|
|
// Set as global logger
|
|
logger.SetGlobalLogger(log)
|
|
|
|
return log, nil
|
|
})
|
|
}
|
|
|
|
// ProvideDatabase creates an FX option that provides the database client.
|
|
func ProvideDatabase() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, log logger.Logger, lc fx.Lifecycle) (*database.Client, error) {
|
|
dsn := cfg.GetString("database.dsn")
|
|
if dsn == "" {
|
|
return nil, fmt.Errorf("database DSN is not configured")
|
|
}
|
|
|
|
maxConns := cfg.GetInt("database.max_connections")
|
|
if maxConns == 0 {
|
|
maxConns = 25
|
|
}
|
|
|
|
maxIdleConns := cfg.GetInt("database.max_idle_connections")
|
|
if maxIdleConns == 0 {
|
|
maxIdleConns = 5
|
|
}
|
|
|
|
connMaxLifetime := cfg.GetDuration("database.conn_max_lifetime")
|
|
if connMaxLifetime == 0 {
|
|
connMaxLifetime = 5 * time.Minute
|
|
}
|
|
|
|
connMaxIdleTime := cfg.GetDuration("database.conn_max_idle_time")
|
|
if connMaxIdleTime == 0 {
|
|
connMaxIdleTime = 10 * time.Minute
|
|
}
|
|
|
|
dbClient, err := database.NewClient(database.Config{
|
|
DSN: dsn,
|
|
MaxConnections: maxConns,
|
|
MaxIdleConns: maxIdleConns,
|
|
ConnMaxLifetime: connMaxLifetime,
|
|
ConnMaxIdleTime: connMaxIdleTime,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create database client: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
// Run migrations on startup
|
|
log.Info("Running database migrations...")
|
|
if err := dbClient.Migrate(ctx); err != nil {
|
|
log.Error("Database migrations failed",
|
|
logger.Error(err),
|
|
)
|
|
return fmt.Errorf("failed to run database migrations: %w", err)
|
|
}
|
|
log.Info("Database migrations completed successfully")
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return dbClient.Close()
|
|
},
|
|
})
|
|
|
|
return dbClient, nil
|
|
})
|
|
}
|
|
|
|
// ProvideErrorBus creates an FX option that provides the error bus.
|
|
func ProvideErrorBus() fx.Option {
|
|
return fx.Provide(func(log logger.Logger, lc fx.Lifecycle) (errorbus.ErrorPublisher, error) {
|
|
bufferSize := 100 // Can be made configurable
|
|
bus := errorbusimpl.NewChannelBus(log, bufferSize)
|
|
|
|
// Register lifecycle hook to close the bus on shutdown
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return bus.Close()
|
|
},
|
|
})
|
|
|
|
return bus, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHealthRegistry creates an FX option that provides the health check registry.
|
|
func ProvideHealthRegistry() fx.Option {
|
|
return fx.Provide(func(dbClient *database.Client) (*health.Registry, error) {
|
|
registry := health.NewRegistry()
|
|
|
|
// Register database health checker
|
|
registry.Register("database", health.NewDatabaseChecker(dbClient))
|
|
|
|
return registry, nil
|
|
})
|
|
}
|
|
|
|
// ProvideMetrics creates an FX option that provides the Prometheus metrics registry.
|
|
func ProvideMetrics() fx.Option {
|
|
return fx.Provide(func() *metrics.Metrics {
|
|
return metrics.NewMetrics()
|
|
})
|
|
}
|
|
|
|
// ProvideTracer creates an FX option that provides the OpenTelemetry tracer.
|
|
func ProvideTracer() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, lc fx.Lifecycle) (trace.TracerProvider, error) {
|
|
enabled := cfg.GetBool("tracing.enabled")
|
|
if !enabled {
|
|
// Return no-op tracer
|
|
return trace.NewNoopTracerProvider(), nil
|
|
}
|
|
|
|
serviceName := cfg.GetString("tracing.service_name")
|
|
if serviceName == "" {
|
|
serviceName = "platform"
|
|
}
|
|
|
|
serviceVersion := cfg.GetString("tracing.service_version")
|
|
if serviceVersion == "" {
|
|
serviceVersion = "1.0.0"
|
|
}
|
|
|
|
env := cfg.GetString("environment")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
otlpEndpoint := cfg.GetString("tracing.otlp_endpoint")
|
|
|
|
tp, err := observability.InitTracer(context.Background(), observability.Config{
|
|
Enabled: enabled,
|
|
ServiceName: serviceName,
|
|
ServiceVersion: serviceVersion,
|
|
Environment: env,
|
|
OTLPEndpoint: otlpEndpoint,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize tracer: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hook to shutdown tracer
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return observability.ShutdownTracer(ctx, tp)
|
|
},
|
|
})
|
|
|
|
return tp, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHTTPServer creates an FX option that provides the HTTP server.
|
|
func ProvideHTTPServer() fx.Option {
|
|
return fx.Provide(func(
|
|
cfg config.ConfigProvider,
|
|
log logger.Logger,
|
|
healthRegistry *health.Registry,
|
|
metricsRegistry *metrics.Metrics,
|
|
errorBus errorbus.ErrorPublisher,
|
|
tracer trace.TracerProvider,
|
|
lc fx.Lifecycle,
|
|
) (*server.Server, error) {
|
|
srv, err := server.NewServer(cfg, log, healthRegistry, metricsRegistry, errorBus, tracer)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create HTTP server: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
log.Info("HTTP server OnStart hook called")
|
|
|
|
// Get server address from config
|
|
port := cfg.GetInt("server.port")
|
|
if port == 0 {
|
|
port = 8080
|
|
}
|
|
host := cfg.GetString("server.host")
|
|
if host == "" {
|
|
host = "0.0.0.0"
|
|
}
|
|
addr := fmt.Sprintf("%s:%d", host, port)
|
|
|
|
log.Info("HTTP server starting",
|
|
logger.String("addr", addr),
|
|
)
|
|
|
|
// Start server in a goroutine
|
|
// ListenAndServe blocks, so we need to start it async
|
|
// If there's an immediate error (like port in use), it will return quickly
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
if err := srv.Start(); err != nil && err != http.ErrServerClosed {
|
|
log.Error("HTTP server failed",
|
|
logger.String("error", err.Error()),
|
|
)
|
|
select {
|
|
case errChan <- err:
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait a short time to detect immediate binding errors
|
|
// If ListenAndServe fails immediately (e.g., port in use), it will return quickly
|
|
select {
|
|
case err := <-errChan:
|
|
return fmt.Errorf("HTTP server failed to start: %w", err)
|
|
case <-time.After(500 * time.Millisecond):
|
|
// If no error after 500ms, verify server is actually listening
|
|
// by attempting a connection
|
|
client := &http.Client{Timeout: 1 * time.Second}
|
|
checkURL := fmt.Sprintf("http://localhost:%d/healthz", port)
|
|
resp, err := client.Get(checkURL)
|
|
if err != nil {
|
|
// Server might still be starting, but log the attempt
|
|
log.Warn("Could not verify HTTP server is listening (may still be starting)",
|
|
logger.String("url", checkURL),
|
|
logger.String("error", err.Error()),
|
|
)
|
|
// Continue anyway - server might still be starting
|
|
} else {
|
|
resp.Body.Close()
|
|
}
|
|
|
|
log.Info("HTTP server started successfully",
|
|
logger.String("addr", addr),
|
|
)
|
|
return nil
|
|
}
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return srv.Shutdown(ctx)
|
|
},
|
|
})
|
|
|
|
return srv, nil
|
|
})
|
|
}
|
|
|
|
// CoreModule returns an FX option that provides all core services.
|
|
// This includes configuration, logging, database, error bus, health checks, metrics, tracing, and HTTP server.
|
|
func CoreModule() fx.Option {
|
|
return fx.Options(
|
|
ProvideConfig(),
|
|
ProvideLogger(),
|
|
ProvideDatabase(),
|
|
ProvideErrorBus(),
|
|
ProvideHealthRegistry(),
|
|
ProvideMetrics(),
|
|
ProvideTracer(),
|
|
ProvideHTTPServer(),
|
|
)
|
|
}
|
|
|
|
// maskDSN masks sensitive information in DSN for logging.
|
|
func maskDSN(dsn string) string {
|
|
// Simple masking: replace password with ***
|
|
// Format: postgres://user:password@host:port/db
|
|
if len(dsn) < 20 {
|
|
return "***"
|
|
}
|
|
// Find @ symbol and replace password part
|
|
if idx := indexOf(dsn, '@'); idx > 0 {
|
|
if colonIdx := indexOf(dsn[:idx], ':'); colonIdx > 0 {
|
|
return dsn[:colonIdx+1] + "***" + dsn[idx:]
|
|
}
|
|
}
|
|
return "***"
|
|
}
|
|
|
|
// indexOf finds the index of a character in a string.
|
|
func indexOf(s string, c byte) int {
|
|
for i := 0; i < len(s); i++ {
|
|
if s[i] == c {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// RegisterLifecycleHooks registers lifecycle hooks for logging.
|
|
func RegisterLifecycleHooks(lc fx.Lifecycle, l logger.Logger) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
l.Info("Application starting",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
l.Info("Application shutting down",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
})
|
|
}
|