Files
goplt/internal/errorbus/channel_bus.go
0x1d 5fdbb729bd
Some checks failed
CI / Test (pull_request) Failing after 17s
CI / Lint (pull_request) Failing after 18s
CI / Build (pull_request) Successful in 12s
CI / Format Check (pull_request) Successful in 2s
test: add comprehensive tests for all Epic 1 stories
Story 1.2: Database Layer
- Test database client creation, connection, ping, and close
- Test connection pooling configuration
- Tests skip if database is not available (short mode)

Story 1.3: Health Monitoring and Metrics
- Test health registry registration and checking
- Test database health checker
- Test liveness and readiness checks
- Test metrics creation, middleware, and handler
- Test Prometheus metrics endpoint

Story 1.4: Error Handling and Error Bus
- Test channel-based error bus creation
- Test error publishing with context
- Test nil error handling
- Test channel full scenario
- Test graceful shutdown
- Fix Close() method to handle multiple calls safely

Story 1.5: HTTP Server and Middleware
- Test server creation with all middleware
- Test request ID middleware
- Test logging middleware
- Test panic recovery middleware
- Test CORS middleware
- Test timeout middleware
- Test health and metrics endpoints
- Test server shutdown

Story 1.6: OpenTelemetry Tracing
- Test tracer initialization (enabled/disabled)
- Test development and production modes
- Test OTLP exporter configuration
- Test graceful shutdown
- Test no-op tracer provider

All tests follow Go testing best practices:
- Table-driven tests where appropriate
- Parallel execution
- Proper mocking of interfaces
- Skip tests requiring external dependencies in short mode
2025-11-05 21:05:36 +01:00

169 lines
3.6 KiB
Go

// Package errorbus provides a channel-based error bus implementation.
package errorbus
import (
"context"
"runtime"
"sync"
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
"git.dcentral.systems/toolz/goplt/pkg/logger"
)
// ChannelBus implements a channel-based error bus.
type ChannelBus struct {
errors chan errorWithContext
logger logger.Logger
done chan struct{}
wg sync.WaitGroup
once sync.Once
closeOnce sync.Once
}
type errorWithContext struct {
err error
ctx context.Context
stack []byte
}
// NewChannelBus creates a new channel-based error bus.
func NewChannelBus(log logger.Logger, bufferSize int) *ChannelBus {
if bufferSize <= 0 {
bufferSize = 100
}
bus := &ChannelBus{
errors: make(chan errorWithContext, bufferSize),
logger: log,
done: make(chan struct{}),
}
// Start background consumer
bus.wg.Add(1)
go bus.consume()
return bus
}
// Publish publishes an error to the error bus.
func (b *ChannelBus) Publish(ctx context.Context, err error) {
if err == nil {
return
}
// Capture stack trace
stack := make([]byte, 4096)
n := runtime.Stack(stack, false)
stack = stack[:n]
select {
case b.errors <- errorWithContext{
err: err,
ctx: ctx,
stack: stack,
}:
// Successfully queued
default:
// Channel is full, log directly to avoid blocking
b.logger.Error("Error bus channel full, logging directly",
logger.String("error", err.Error()),
)
}
}
// consume consumes errors from the channel and logs them.
func (b *ChannelBus) consume() {
defer b.wg.Done()
for {
select {
case errCtx := <-b.errors:
b.handleError(errCtx)
case <-b.done:
return
}
}
}
// handleError handles a single error by logging it with context.
func (b *ChannelBus) handleError(errCtx errorWithContext) {
fields := []logger.Field{
logger.String("error", errCtx.err.Error()),
}
// Extract request ID from context
if requestID := extractRequestID(errCtx.ctx); requestID != "" {
fields = append(fields, logger.String("request_id", requestID))
}
// Extract user ID from context
if userID := extractUserID(errCtx.ctx); userID != "" {
fields = append(fields, logger.String("user_id", userID))
}
// Add stack trace for debugging
if len(errCtx.stack) > 0 {
fields = append(fields, logger.String("stack", string(errCtx.stack)))
}
b.logger.Error("Error captured by error bus", fields...)
// TODO: In Epic 6, add Sentry integration here
// if b.sentryClient != nil {
// b.sentryClient.CaptureException(errCtx.err, ...)
// }
}
// extractRequestID extracts request ID from context.
func extractRequestID(ctx context.Context) string {
if ctx == nil {
return ""
}
// Try common context key patterns
if val := ctx.Value("request_id"); val != nil {
if str, ok := val.(string); ok {
return str
}
}
if val := ctx.Value("RequestID"); val != nil {
if str, ok := val.(string); ok {
return str
}
}
return ""
}
// extractUserID extracts user ID from context.
func extractUserID(ctx context.Context) string {
if ctx == nil {
return ""
}
// Try common context key patterns
if val := ctx.Value("user_id"); val != nil {
if str, ok := val.(string); ok {
return str
}
}
if val := ctx.Value("UserID"); val != nil {
if str, ok := val.(string); ok {
return str
}
}
return ""
}
// Close closes the error bus and waits for all errors to be processed.
func (b *ChannelBus) Close() error {
b.once.Do(func() {
close(b.done)
})
b.wg.Wait()
b.closeOnce.Do(func() {
close(b.errors)
})
return nil
}
// Ensure ChannelBus implements ErrorPublisher
var _ errorbus.ErrorPublisher = (*ChannelBus)(nil)