// Package errorbus provides a channel-based error bus implementation. package errorbus import ( "context" "runtime" "sync" "git.dcentral.systems/toolz/goplt/pkg/errorbus" "git.dcentral.systems/toolz/goplt/pkg/logger" ) // ChannelBus implements a channel-based error bus. type ChannelBus struct { errors chan errorWithContext logger logger.Logger done chan struct{} wg sync.WaitGroup once sync.Once closeOnce sync.Once } type errorWithContext struct { err error ctx context.Context stack []byte } // NewChannelBus creates a new channel-based error bus. func NewChannelBus(log logger.Logger, bufferSize int) *ChannelBus { if bufferSize <= 0 { bufferSize = 100 } bus := &ChannelBus{ errors: make(chan errorWithContext, bufferSize), logger: log, done: make(chan struct{}), } // Start background consumer bus.wg.Add(1) go bus.consume() return bus } // Publish publishes an error to the error bus. func (b *ChannelBus) Publish(ctx context.Context, err error) { if err == nil { return } // Capture stack trace stack := make([]byte, 4096) n := runtime.Stack(stack, false) stack = stack[:n] select { case b.errors <- errorWithContext{ err: err, ctx: ctx, stack: stack, }: // Successfully queued default: // Channel is full, log directly to avoid blocking b.logger.Error("Error bus channel full, logging directly", logger.String("error", err.Error()), ) } } // consume consumes errors from the channel and logs them. func (b *ChannelBus) consume() { defer b.wg.Done() for { select { case errCtx := <-b.errors: b.handleError(errCtx) case <-b.done: return } } } // handleError handles a single error by logging it with context. func (b *ChannelBus) handleError(errCtx errorWithContext) { fields := []logger.Field{ logger.String("error", errCtx.err.Error()), } // Extract request ID from context if requestID := extractRequestID(errCtx.ctx); requestID != "" { fields = append(fields, logger.String("request_id", requestID)) } // Extract user ID from context if userID := extractUserID(errCtx.ctx); userID != "" { fields = append(fields, logger.String("user_id", userID)) } // Add stack trace for debugging if len(errCtx.stack) > 0 { fields = append(fields, logger.String("stack", string(errCtx.stack))) } b.logger.Error("Error captured by error bus", fields...) // TODO: In Epic 6, add Sentry integration here // if b.sentryClient != nil { // b.sentryClient.CaptureException(errCtx.err, ...) // } } // extractRequestID extracts request ID from context. func extractRequestID(ctx context.Context) string { if ctx == nil { return "" } // Try common context key patterns if val := ctx.Value("request_id"); val != nil { if str, ok := val.(string); ok { return str } } if val := ctx.Value("RequestID"); val != nil { if str, ok := val.(string); ok { return str } } return "" } // extractUserID extracts user ID from context. func extractUserID(ctx context.Context) string { if ctx == nil { return "" } // Try common context key patterns if val := ctx.Value("user_id"); val != nil { if str, ok := val.(string); ok { return str } } if val := ctx.Value("UserID"); val != nil { if str, ok := val.(string); ok { return str } } return "" } // Close closes the error bus and waits for all errors to be processed. func (b *ChannelBus) Close() error { b.once.Do(func() { close(b.done) }) b.wg.Wait() b.closeOnce.Do(func() { close(b.errors) }) return nil } // Ensure ChannelBus implements ErrorPublisher var _ errorbus.ErrorPublisher = (*ChannelBus)(nil)