Fixes: - Added database connection logging with masked DSN - Added migration progress logging - Added HTTP server startup logging with address - Fixed database provider to accept logger parameter - Improved error visibility throughout initialization Documentation: - Moved Story 1.7 (Service Client Interfaces) to Epic 2 as Story 2.7 - Updated Epic 1 and Epic 2 READMEs - Updated COMPLETE_TASK_LIST.md - Updated story metadata (ID, Epic, Dependencies) These changes will help diagnose startup issues and provide better visibility into what the application is doing.
321 lines
8.3 KiB
Go
321 lines
8.3 KiB
Go
package di
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
configimpl "git.dcentral.systems/toolz/goplt/internal/config"
|
|
errorbusimpl "git.dcentral.systems/toolz/goplt/internal/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/internal/health"
|
|
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
|
loggerimpl "git.dcentral.systems/toolz/goplt/internal/logger"
|
|
"git.dcentral.systems/toolz/goplt/internal/metrics"
|
|
"git.dcentral.systems/toolz/goplt/internal/observability"
|
|
"git.dcentral.systems/toolz/goplt/internal/server"
|
|
"git.dcentral.systems/toolz/goplt/pkg/config"
|
|
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
// ProvideConfig creates an FX option that provides ConfigProvider.
|
|
func ProvideConfig() fx.Option {
|
|
return fx.Provide(func() (config.ConfigProvider, error) {
|
|
// Determine environment from environment variable or default to "development"
|
|
env := os.Getenv("ENVIRONMENT")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
cfg, err := configimpl.LoadConfig(env)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
|
|
return cfg, nil
|
|
})
|
|
}
|
|
|
|
// ProvideLogger creates an FX option that provides Logger.
|
|
func ProvideLogger() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider) (logger.Logger, error) {
|
|
level := cfg.GetString("logging.level")
|
|
if level == "" {
|
|
level = "info"
|
|
}
|
|
|
|
format := cfg.GetString("logging.format")
|
|
if format == "" {
|
|
format = "json"
|
|
}
|
|
|
|
log, err := loggerimpl.NewZapLogger(level, format)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create logger: %w", err)
|
|
}
|
|
|
|
// Set as global logger
|
|
logger.SetGlobalLogger(log)
|
|
|
|
return log, nil
|
|
})
|
|
}
|
|
|
|
// ProvideDatabase creates an FX option that provides the database client.
|
|
func ProvideDatabase() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, log logger.Logger, lc fx.Lifecycle) (*database.Client, error) {
|
|
dsn := cfg.GetString("database.dsn")
|
|
if dsn == "" {
|
|
return nil, fmt.Errorf("database DSN is not configured")
|
|
}
|
|
|
|
maxConns := cfg.GetInt("database.max_connections")
|
|
if maxConns == 0 {
|
|
maxConns = 25
|
|
}
|
|
|
|
maxIdleConns := cfg.GetInt("database.max_idle_connections")
|
|
if maxIdleConns == 0 {
|
|
maxIdleConns = 5
|
|
}
|
|
|
|
connMaxLifetime := cfg.GetDuration("database.conn_max_lifetime")
|
|
if connMaxLifetime == 0 {
|
|
connMaxLifetime = 5 * time.Minute
|
|
}
|
|
|
|
connMaxIdleTime := cfg.GetDuration("database.conn_max_idle_time")
|
|
if connMaxIdleTime == 0 {
|
|
connMaxIdleTime = 10 * time.Minute
|
|
}
|
|
|
|
dbClient, err := database.NewClient(database.Config{
|
|
DSN: dsn,
|
|
MaxConnections: maxConns,
|
|
MaxIdleConns: maxIdleConns,
|
|
ConnMaxLifetime: connMaxLifetime,
|
|
ConnMaxIdleTime: connMaxIdleTime,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create database client: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
// Run migrations on startup
|
|
log.Info("Running database migrations...")
|
|
if err := dbClient.Migrate(ctx); err != nil {
|
|
return fmt.Errorf("failed to run database migrations: %w", err)
|
|
}
|
|
log.Info("Database migrations completed successfully")
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return dbClient.Close()
|
|
},
|
|
})
|
|
|
|
return dbClient, nil
|
|
})
|
|
}
|
|
|
|
// ProvideErrorBus creates an FX option that provides the error bus.
|
|
func ProvideErrorBus() fx.Option {
|
|
return fx.Provide(func(log logger.Logger, lc fx.Lifecycle) (errorbus.ErrorPublisher, error) {
|
|
bufferSize := 100 // Can be made configurable
|
|
bus := errorbusimpl.NewChannelBus(log, bufferSize)
|
|
|
|
// Register lifecycle hook to close the bus on shutdown
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return bus.Close()
|
|
},
|
|
})
|
|
|
|
return bus, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHealthRegistry creates an FX option that provides the health check registry.
|
|
func ProvideHealthRegistry() fx.Option {
|
|
return fx.Provide(func(dbClient *database.Client) (*health.Registry, error) {
|
|
registry := health.NewRegistry()
|
|
|
|
// Register database health checker
|
|
registry.Register("database", health.NewDatabaseChecker(dbClient))
|
|
|
|
return registry, nil
|
|
})
|
|
}
|
|
|
|
// ProvideMetrics creates an FX option that provides the Prometheus metrics registry.
|
|
func ProvideMetrics() fx.Option {
|
|
return fx.Provide(func() *metrics.Metrics {
|
|
return metrics.NewMetrics()
|
|
})
|
|
}
|
|
|
|
// ProvideTracer creates an FX option that provides the OpenTelemetry tracer.
|
|
func ProvideTracer() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, lc fx.Lifecycle) (trace.TracerProvider, error) {
|
|
enabled := cfg.GetBool("tracing.enabled")
|
|
if !enabled {
|
|
// Return no-op tracer
|
|
return trace.NewNoopTracerProvider(), nil
|
|
}
|
|
|
|
serviceName := cfg.GetString("tracing.service_name")
|
|
if serviceName == "" {
|
|
serviceName = "platform"
|
|
}
|
|
|
|
serviceVersion := cfg.GetString("tracing.service_version")
|
|
if serviceVersion == "" {
|
|
serviceVersion = "1.0.0"
|
|
}
|
|
|
|
env := cfg.GetString("environment")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
otlpEndpoint := cfg.GetString("tracing.otlp_endpoint")
|
|
|
|
tp, err := observability.InitTracer(context.Background(), observability.Config{
|
|
Enabled: enabled,
|
|
ServiceName: serviceName,
|
|
ServiceVersion: serviceVersion,
|
|
Environment: env,
|
|
OTLPEndpoint: otlpEndpoint,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize tracer: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hook to shutdown tracer
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return observability.ShutdownTracer(ctx, tp)
|
|
},
|
|
})
|
|
|
|
return tp, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHTTPServer creates an FX option that provides the HTTP server.
|
|
func ProvideHTTPServer() fx.Option {
|
|
return fx.Provide(func(
|
|
cfg config.ConfigProvider,
|
|
log logger.Logger,
|
|
healthRegistry *health.Registry,
|
|
metricsRegistry *metrics.Metrics,
|
|
errorBus errorbus.ErrorPublisher,
|
|
tracer trace.TracerProvider,
|
|
lc fx.Lifecycle,
|
|
) (*server.Server, error) {
|
|
srv, err := server.NewServer(cfg, log, healthRegistry, metricsRegistry, errorBus, tracer)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create HTTP server: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
// Get server address from config
|
|
port := cfg.GetInt("server.port")
|
|
if port == 0 {
|
|
port = 8080
|
|
}
|
|
host := cfg.GetString("server.host")
|
|
if host == "" {
|
|
host = "0.0.0.0"
|
|
}
|
|
addr := fmt.Sprintf("%s:%d", host, port)
|
|
|
|
// Start server in a goroutine
|
|
go func() {
|
|
log.Info("HTTP server starting",
|
|
logger.String("addr", addr),
|
|
)
|
|
if err := srv.Start(); err != nil && err != http.ErrServerClosed {
|
|
log.Error("HTTP server error",
|
|
logger.String("error", err.Error()),
|
|
)
|
|
}
|
|
}()
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return srv.Shutdown(ctx)
|
|
},
|
|
})
|
|
|
|
return srv, nil
|
|
})
|
|
}
|
|
|
|
// CoreModule returns an FX option that provides all core services.
|
|
// This includes configuration, logging, database, error bus, health checks, metrics, tracing, and HTTP server.
|
|
func CoreModule() fx.Option {
|
|
return fx.Options(
|
|
ProvideConfig(),
|
|
ProvideLogger(),
|
|
ProvideDatabase(),
|
|
ProvideErrorBus(),
|
|
ProvideHealthRegistry(),
|
|
ProvideMetrics(),
|
|
ProvideTracer(),
|
|
ProvideHTTPServer(),
|
|
)
|
|
}
|
|
|
|
// maskDSN masks sensitive information in DSN for logging.
|
|
func maskDSN(dsn string) string {
|
|
// Simple masking: replace password with ***
|
|
// Format: postgres://user:password@host:port/db
|
|
if len(dsn) < 20 {
|
|
return "***"
|
|
}
|
|
// Find @ symbol and replace password part
|
|
if idx := indexOf(dsn, '@'); idx > 0 {
|
|
if colonIdx := indexOf(dsn[:idx], ':'); colonIdx > 0 {
|
|
return dsn[:colonIdx+1] + "***" + dsn[idx:]
|
|
}
|
|
}
|
|
return "***"
|
|
}
|
|
|
|
// indexOf finds the index of a character in a string.
|
|
func indexOf(s string, c byte) int {
|
|
for i := 0; i < len(s); i++ {
|
|
if s[i] == c {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// RegisterLifecycleHooks registers lifecycle hooks for logging.
|
|
func RegisterLifecycleHooks(lc fx.Lifecycle, l logger.Logger) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
l.Info("Application starting",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
l.Info("Application shutting down",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
})
|
|
}
|