feat(epic1): implement core infrastructure (stories 1.1-1.5)
Implemented Epic 1 core kernel and infrastructure stories: Story 1.1: Enhanced DI Container - Added providers for database, health, metrics, and error bus - Extended CoreModule to include all core services Story 1.2: Database Layer with Ent ORM - Created Ent schema for User, Role, Permission, AuditLog entities - Implemented many-to-many relationships (User-Role, Role-Permission) - Created database client wrapper with connection pooling - Added database provider to DI container with migration support Story 1.3: Health Monitoring and Metrics System - Implemented health check registry and interface - Added database health checker - Created Prometheus metrics system with HTTP instrumentation - Added health and metrics providers to DI container Story 1.4: Error Handling and Error Bus - Implemented channel-based error bus - Created ErrorPublisher interface - Added error bus provider with lifecycle management Story 1.5: HTTP Server Foundation - Created HTTP server with Gin framework - Implemented comprehensive middleware stack: - Request ID generation - Structured logging - Panic recovery with error bus integration - Prometheus metrics collection - CORS support - Registered core routes: /healthz, /ready, /metrics - Integrated with FX lifecycle for graceful shutdown All components are integrated via DI container and ready for use.
This commit is contained in:
165
internal/errorbus/channel_bus.go
Normal file
165
internal/errorbus/channel_bus.go
Normal file
@@ -0,0 +1,165 @@
|
||||
package errorbus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
||||
)
|
||||
|
||||
// ChannelBus implements a channel-based error bus.
|
||||
type ChannelBus struct {
|
||||
errors chan errorWithContext
|
||||
logger logger.Logger
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
type errorWithContext struct {
|
||||
err error
|
||||
ctx context.Context
|
||||
stack []byte
|
||||
}
|
||||
|
||||
// NewChannelBus creates a new channel-based error bus.
|
||||
func NewChannelBus(log logger.Logger, bufferSize int) *ChannelBus {
|
||||
if bufferSize <= 0 {
|
||||
bufferSize = 100
|
||||
}
|
||||
|
||||
bus := &ChannelBus{
|
||||
errors: make(chan errorWithContext, bufferSize),
|
||||
logger: log,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start background consumer
|
||||
bus.wg.Add(1)
|
||||
go bus.consume()
|
||||
|
||||
return bus
|
||||
}
|
||||
|
||||
// Publish publishes an error to the error bus.
|
||||
func (b *ChannelBus) Publish(ctx context.Context, err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Capture stack trace
|
||||
stack := make([]byte, 4096)
|
||||
n := runtime.Stack(stack, false)
|
||||
stack = stack[:n]
|
||||
|
||||
select {
|
||||
case b.errors <- errorWithContext{
|
||||
err: err,
|
||||
ctx: ctx,
|
||||
stack: stack,
|
||||
}:
|
||||
// Successfully queued
|
||||
default:
|
||||
// Channel is full, log directly to avoid blocking
|
||||
b.logger.Error("Error bus channel full, logging directly",
|
||||
logger.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// consume consumes errors from the channel and logs them.
|
||||
func (b *ChannelBus) consume() {
|
||||
defer b.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case errCtx := <-b.errors:
|
||||
b.handleError(errCtx)
|
||||
case <-b.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleError handles a single error by logging it with context.
|
||||
func (b *ChannelBus) handleError(errCtx errorWithContext) {
|
||||
fields := []logger.Field{
|
||||
logger.String("error", errCtx.err.Error()),
|
||||
}
|
||||
|
||||
// Extract request ID from context
|
||||
if requestID := extractRequestID(errCtx.ctx); requestID != "" {
|
||||
fields = append(fields, logger.String("request_id", requestID))
|
||||
}
|
||||
|
||||
// Extract user ID from context
|
||||
if userID := extractUserID(errCtx.ctx); userID != "" {
|
||||
fields = append(fields, logger.String("user_id", userID))
|
||||
}
|
||||
|
||||
// Add stack trace for debugging
|
||||
if len(errCtx.stack) > 0 {
|
||||
fields = append(fields, logger.String("stack", string(errCtx.stack)))
|
||||
}
|
||||
|
||||
b.logger.Error("Error captured by error bus", fields...)
|
||||
|
||||
// TODO: In Epic 6, add Sentry integration here
|
||||
// if b.sentryClient != nil {
|
||||
// b.sentryClient.CaptureException(errCtx.err, ...)
|
||||
// }
|
||||
}
|
||||
|
||||
// extractRequestID extracts request ID from context.
|
||||
func extractRequestID(ctx context.Context) string {
|
||||
if ctx == nil {
|
||||
return ""
|
||||
}
|
||||
// Try common context key patterns
|
||||
if val := ctx.Value("request_id"); val != nil {
|
||||
if str, ok := val.(string); ok {
|
||||
return str
|
||||
}
|
||||
}
|
||||
if val := ctx.Value("RequestID"); val != nil {
|
||||
if str, ok := val.(string); ok {
|
||||
return str
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// extractUserID extracts user ID from context.
|
||||
func extractUserID(ctx context.Context) string {
|
||||
if ctx == nil {
|
||||
return ""
|
||||
}
|
||||
// Try common context key patterns
|
||||
if val := ctx.Value("user_id"); val != nil {
|
||||
if str, ok := val.(string); ok {
|
||||
return str
|
||||
}
|
||||
}
|
||||
if val := ctx.Value("UserID"); val != nil {
|
||||
if str, ok := val.(string); ok {
|
||||
return str
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Close closes the error bus and waits for all errors to be processed.
|
||||
func (b *ChannelBus) Close() error {
|
||||
b.once.Do(func() {
|
||||
close(b.done)
|
||||
})
|
||||
b.wg.Wait()
|
||||
close(b.errors)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ensure ChannelBus implements ErrorPublisher
|
||||
var _ errorbus.ErrorPublisher = (*ChannelBus)(nil)
|
||||
|
||||
Reference in New Issue
Block a user