- Added better error detection for HTTP server startup - Added connectivity check to verify server is actually listening - Increased wait time to 500ms for better error detection - Added warning log if server connectivity check fails (may still be starting) - Improved logging messages for server startup This should help diagnose why the HTTP server isn't starting and provide better visibility into the startup process.
356 lines
9.6 KiB
Go
356 lines
9.6 KiB
Go
package di
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
configimpl "git.dcentral.systems/toolz/goplt/internal/config"
|
|
errorbusimpl "git.dcentral.systems/toolz/goplt/internal/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/internal/health"
|
|
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
|
loggerimpl "git.dcentral.systems/toolz/goplt/internal/logger"
|
|
"git.dcentral.systems/toolz/goplt/internal/metrics"
|
|
"git.dcentral.systems/toolz/goplt/internal/observability"
|
|
"git.dcentral.systems/toolz/goplt/internal/server"
|
|
"git.dcentral.systems/toolz/goplt/pkg/config"
|
|
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
|
|
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
// ProvideConfig creates an FX option that provides ConfigProvider.
|
|
func ProvideConfig() fx.Option {
|
|
return fx.Provide(func() (config.ConfigProvider, error) {
|
|
// Determine environment from environment variable or default to "development"
|
|
env := os.Getenv("ENVIRONMENT")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
cfg, err := configimpl.LoadConfig(env)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
|
|
return cfg, nil
|
|
})
|
|
}
|
|
|
|
// ProvideLogger creates an FX option that provides Logger.
|
|
func ProvideLogger() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider) (logger.Logger, error) {
|
|
level := cfg.GetString("logging.level")
|
|
if level == "" {
|
|
level = "info"
|
|
}
|
|
|
|
format := cfg.GetString("logging.format")
|
|
if format == "" {
|
|
format = "json"
|
|
}
|
|
|
|
log, err := loggerimpl.NewZapLogger(level, format)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create logger: %w", err)
|
|
}
|
|
|
|
// Set as global logger
|
|
logger.SetGlobalLogger(log)
|
|
|
|
return log, nil
|
|
})
|
|
}
|
|
|
|
// ProvideDatabase creates an FX option that provides the database client.
|
|
func ProvideDatabase() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, log logger.Logger, lc fx.Lifecycle) (*database.Client, error) {
|
|
dsn := cfg.GetString("database.dsn")
|
|
if dsn == "" {
|
|
return nil, fmt.Errorf("database DSN is not configured")
|
|
}
|
|
|
|
maxConns := cfg.GetInt("database.max_connections")
|
|
if maxConns == 0 {
|
|
maxConns = 25
|
|
}
|
|
|
|
maxIdleConns := cfg.GetInt("database.max_idle_connections")
|
|
if maxIdleConns == 0 {
|
|
maxIdleConns = 5
|
|
}
|
|
|
|
connMaxLifetime := cfg.GetDuration("database.conn_max_lifetime")
|
|
if connMaxLifetime == 0 {
|
|
connMaxLifetime = 5 * time.Minute
|
|
}
|
|
|
|
connMaxIdleTime := cfg.GetDuration("database.conn_max_idle_time")
|
|
if connMaxIdleTime == 0 {
|
|
connMaxIdleTime = 10 * time.Minute
|
|
}
|
|
|
|
dbClient, err := database.NewClient(database.Config{
|
|
DSN: dsn,
|
|
MaxConnections: maxConns,
|
|
MaxIdleConns: maxIdleConns,
|
|
ConnMaxLifetime: connMaxLifetime,
|
|
ConnMaxIdleTime: connMaxIdleTime,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create database client: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
// Run migrations on startup
|
|
log.Info("Running database migrations...")
|
|
if err := dbClient.Migrate(ctx); err != nil {
|
|
return fmt.Errorf("failed to run database migrations: %w", err)
|
|
}
|
|
log.Info("Database migrations completed successfully")
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return dbClient.Close()
|
|
},
|
|
})
|
|
|
|
return dbClient, nil
|
|
})
|
|
}
|
|
|
|
// ProvideErrorBus creates an FX option that provides the error bus.
|
|
func ProvideErrorBus() fx.Option {
|
|
return fx.Provide(func(log logger.Logger, lc fx.Lifecycle) (errorbus.ErrorPublisher, error) {
|
|
bufferSize := 100 // Can be made configurable
|
|
bus := errorbusimpl.NewChannelBus(log, bufferSize)
|
|
|
|
// Register lifecycle hook to close the bus on shutdown
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return bus.Close()
|
|
},
|
|
})
|
|
|
|
return bus, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHealthRegistry creates an FX option that provides the health check registry.
|
|
func ProvideHealthRegistry() fx.Option {
|
|
return fx.Provide(func(dbClient *database.Client) (*health.Registry, error) {
|
|
registry := health.NewRegistry()
|
|
|
|
// Register database health checker
|
|
registry.Register("database", health.NewDatabaseChecker(dbClient))
|
|
|
|
return registry, nil
|
|
})
|
|
}
|
|
|
|
// ProvideMetrics creates an FX option that provides the Prometheus metrics registry.
|
|
func ProvideMetrics() fx.Option {
|
|
return fx.Provide(func() *metrics.Metrics {
|
|
return metrics.NewMetrics()
|
|
})
|
|
}
|
|
|
|
// ProvideTracer creates an FX option that provides the OpenTelemetry tracer.
|
|
func ProvideTracer() fx.Option {
|
|
return fx.Provide(func(cfg config.ConfigProvider, lc fx.Lifecycle) (trace.TracerProvider, error) {
|
|
enabled := cfg.GetBool("tracing.enabled")
|
|
if !enabled {
|
|
// Return no-op tracer
|
|
return trace.NewNoopTracerProvider(), nil
|
|
}
|
|
|
|
serviceName := cfg.GetString("tracing.service_name")
|
|
if serviceName == "" {
|
|
serviceName = "platform"
|
|
}
|
|
|
|
serviceVersion := cfg.GetString("tracing.service_version")
|
|
if serviceVersion == "" {
|
|
serviceVersion = "1.0.0"
|
|
}
|
|
|
|
env := cfg.GetString("environment")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
otlpEndpoint := cfg.GetString("tracing.otlp_endpoint")
|
|
|
|
tp, err := observability.InitTracer(context.Background(), observability.Config{
|
|
Enabled: enabled,
|
|
ServiceName: serviceName,
|
|
ServiceVersion: serviceVersion,
|
|
Environment: env,
|
|
OTLPEndpoint: otlpEndpoint,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize tracer: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hook to shutdown tracer
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return observability.ShutdownTracer(ctx, tp)
|
|
},
|
|
})
|
|
|
|
return tp, nil
|
|
})
|
|
}
|
|
|
|
// ProvideHTTPServer creates an FX option that provides the HTTP server.
|
|
func ProvideHTTPServer() fx.Option {
|
|
return fx.Provide(func(
|
|
cfg config.ConfigProvider,
|
|
log logger.Logger,
|
|
healthRegistry *health.Registry,
|
|
metricsRegistry *metrics.Metrics,
|
|
errorBus errorbus.ErrorPublisher,
|
|
tracer trace.TracerProvider,
|
|
lc fx.Lifecycle,
|
|
) (*server.Server, error) {
|
|
srv, err := server.NewServer(cfg, log, healthRegistry, metricsRegistry, errorBus, tracer)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create HTTP server: %w", err)
|
|
}
|
|
|
|
// Register lifecycle hooks
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
// Get server address from config
|
|
port := cfg.GetInt("server.port")
|
|
if port == 0 {
|
|
port = 8080
|
|
}
|
|
host := cfg.GetString("server.host")
|
|
if host == "" {
|
|
host = "0.0.0.0"
|
|
}
|
|
addr := fmt.Sprintf("%s:%d", host, port)
|
|
|
|
log.Info("HTTP server starting",
|
|
logger.String("addr", addr),
|
|
)
|
|
|
|
// Start server in a goroutine
|
|
// ListenAndServe blocks, so we need to start it async
|
|
// If there's an immediate error (like port in use), it will return quickly
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
if err := srv.Start(); err != nil && err != http.ErrServerClosed {
|
|
log.Error("HTTP server failed",
|
|
logger.String("error", err.Error()),
|
|
)
|
|
select {
|
|
case errChan <- err:
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait a short time to detect immediate binding errors
|
|
// If ListenAndServe fails immediately (e.g., port in use), it will return quickly
|
|
select {
|
|
case err := <-errChan:
|
|
return fmt.Errorf("HTTP server failed to start: %w", err)
|
|
case <-time.After(500 * time.Millisecond):
|
|
// If no error after 500ms, verify server is actually listening
|
|
// by attempting a connection
|
|
client := &http.Client{Timeout: 1 * time.Second}
|
|
checkURL := fmt.Sprintf("http://localhost:%d/healthz", port)
|
|
resp, err := client.Get(checkURL)
|
|
if err != nil {
|
|
// Server might still be starting, but log the attempt
|
|
log.Warn("Could not verify HTTP server is listening (may still be starting)",
|
|
logger.String("url", checkURL),
|
|
logger.String("error", err.Error()),
|
|
)
|
|
// Continue anyway - server might still be starting
|
|
} else {
|
|
resp.Body.Close()
|
|
}
|
|
|
|
log.Info("HTTP server started successfully",
|
|
logger.String("addr", addr),
|
|
)
|
|
return nil
|
|
}
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return srv.Shutdown(ctx)
|
|
},
|
|
})
|
|
|
|
return srv, nil
|
|
})
|
|
}
|
|
|
|
// CoreModule returns an FX option that provides all core services.
|
|
// This includes configuration, logging, database, error bus, health checks, metrics, tracing, and HTTP server.
|
|
func CoreModule() fx.Option {
|
|
return fx.Options(
|
|
ProvideConfig(),
|
|
ProvideLogger(),
|
|
ProvideDatabase(),
|
|
ProvideErrorBus(),
|
|
ProvideHealthRegistry(),
|
|
ProvideMetrics(),
|
|
ProvideTracer(),
|
|
ProvideHTTPServer(),
|
|
)
|
|
}
|
|
|
|
// maskDSN masks sensitive information in DSN for logging.
|
|
func maskDSN(dsn string) string {
|
|
// Simple masking: replace password with ***
|
|
// Format: postgres://user:password@host:port/db
|
|
if len(dsn) < 20 {
|
|
return "***"
|
|
}
|
|
// Find @ symbol and replace password part
|
|
if idx := indexOf(dsn, '@'); idx > 0 {
|
|
if colonIdx := indexOf(dsn[:idx], ':'); colonIdx > 0 {
|
|
return dsn[:colonIdx+1] + "***" + dsn[idx:]
|
|
}
|
|
}
|
|
return "***"
|
|
}
|
|
|
|
// indexOf finds the index of a character in a string.
|
|
func indexOf(s string, c byte) int {
|
|
for i := 0; i < len(s); i++ {
|
|
if s[i] == c {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// RegisterLifecycleHooks registers lifecycle hooks for logging.
|
|
func RegisterLifecycleHooks(lc fx.Lifecycle, l logger.Logger) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
l.Info("Application starting",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
l.Info("Application shutting down",
|
|
logger.String("component", "bootstrap"),
|
|
)
|
|
return nil
|
|
},
|
|
})
|
|
}
|