Files
goplt/cmd/audit-service/audit_service_fx.go

344 lines
8.9 KiB
Go

// Package main provides FX providers for Audit Service.
// This file creates the service inline to avoid importing internal packages.
package main
import (
"context"
"fmt"
"net"
"time"
auditv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/audit/v1"
"git.dcentral.systems/toolz/goplt/internal/ent"
"git.dcentral.systems/toolz/goplt/internal/ent/auditlog"
"git.dcentral.systems/toolz/goplt/internal/infra/database"
"git.dcentral.systems/toolz/goplt/pkg/config"
"git.dcentral.systems/toolz/goplt/pkg/logger"
"go.uber.org/fx"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)
// auditLogEntry represents an audit log entry.
type auditLogEntry struct {
UserID string
Action string
Resource string
ResourceID string
IPAddress string
UserAgent string
Metadata map[string]string
Timestamp int64
}
// auditLogFilters contains filters for querying audit logs.
type auditLogFilters struct {
UserID *string
Action *string
Resource *string
ResourceID *string
StartTime *int64
EndTime *int64
Limit int
Offset int
}
// auditService provides audit logging functionality.
type auditService struct {
client *database.Client
logger logger.Logger
}
// record records an audit log entry.
func (s *auditService) record(ctx context.Context, entry *auditLogEntry) error {
// Convert metadata map to JSON
metadataJSON := make(map[string]interface{})
for k, v := range entry.Metadata {
metadataJSON[k] = v
}
// Create audit log entry
timestamp := time.Unix(entry.Timestamp, 0)
if entry.Timestamp == 0 {
timestamp = time.Now()
}
create := s.client.AuditLog.Create().
SetID(fmt.Sprintf("%d-%d", time.Now().Unix(), time.Now().UnixNano()%1000000)).
SetUserID(entry.UserID).
SetAction(entry.Action).
SetMetadata(metadataJSON).
SetTimestamp(timestamp)
if entry.Resource != "" {
create = create.SetResource(entry.Resource)
}
if entry.ResourceID != "" {
create = create.SetResourceID(entry.ResourceID)
}
if entry.IPAddress != "" {
create = create.SetIPAddress(entry.IPAddress)
}
if entry.UserAgent != "" {
create = create.SetUserAgent(entry.UserAgent)
}
_, err := create.Save(ctx)
if err != nil {
s.logger.Error("Failed to record audit log",
zap.Error(err),
zap.String("user_id", entry.UserID),
zap.String("action", entry.Action),
)
return fmt.Errorf("failed to record audit log: %w", err)
}
return nil
}
// query queries audit logs based on filters.
func (s *auditService) query(ctx context.Context, filters *auditLogFilters) ([]*auditLogEntry, error) {
query := s.client.AuditLog.Query()
// Apply filters
if filters.UserID != nil {
query = query.Where(auditlog.UserID(*filters.UserID))
}
if filters.Action != nil {
query = query.Where(auditlog.Action(*filters.Action))
}
if filters.Resource != nil {
query = query.Where(auditlog.Resource(*filters.Resource))
}
if filters.ResourceID != nil {
query = query.Where(auditlog.ResourceID(*filters.ResourceID))
}
if filters.StartTime != nil {
query = query.Where(auditlog.TimestampGTE(time.Unix(*filters.StartTime, 0)))
}
if filters.EndTime != nil {
query = query.Where(auditlog.TimestampLTE(time.Unix(*filters.EndTime, 0)))
}
// Apply pagination
if filters.Limit > 0 {
query = query.Limit(filters.Limit)
}
if filters.Offset > 0 {
query = query.Offset(filters.Offset)
}
// Order by timestamp descending
query = query.Order(ent.Desc(auditlog.FieldTimestamp))
// Execute query
auditLogs, err := query.All(ctx)
if err != nil {
s.logger.Error("Failed to query audit logs",
zap.Error(err),
)
return nil, fmt.Errorf("failed to query audit logs: %w", err)
}
// Convert to service entries
entries := make([]*auditLogEntry, 0, len(auditLogs))
for _, log := range auditLogs {
// Convert metadata from map[string]interface{} to map[string]string
metadata := make(map[string]string)
if log.Metadata != nil {
for k, v := range log.Metadata {
if str, ok := v.(string); ok {
metadata[k] = str
} else {
metadata[k] = fmt.Sprintf("%v", v)
}
}
}
entry := &auditLogEntry{
UserID: log.UserID,
Action: log.Action,
Resource: log.Resource,
ResourceID: log.ResourceID,
IPAddress: log.IPAddress,
UserAgent: log.UserAgent,
Metadata: metadata,
Timestamp: log.Timestamp.Unix(),
}
entries = append(entries, entry)
}
return entries, nil
}
// auditServerImpl implements the AuditService gRPC server.
type auditServerImpl struct {
auditv1.UnimplementedAuditServiceServer
service *auditService
logger *zap.Logger
}
// Record records an audit log entry.
func (s *auditServerImpl) Record(ctx context.Context, req *auditv1.RecordRequest) (*auditv1.RecordResponse, error) {
if req.Entry == nil {
return nil, status.Error(codes.InvalidArgument, "entry is required")
}
entry := req.Entry
// Convert proto entry to service entry
serviceEntry := &auditLogEntry{
UserID: entry.UserId,
Action: entry.Action,
Resource: entry.Resource,
ResourceID: entry.ResourceId,
IPAddress: entry.IpAddress,
UserAgent: entry.UserAgent,
Metadata: entry.Metadata,
Timestamp: entry.Timestamp,
}
// Record the audit log
if err := s.service.record(ctx, serviceEntry); err != nil {
s.logger.Error("Failed to record audit log",
zap.Error(err),
zap.String("user_id", entry.UserId),
zap.String("action", entry.Action),
)
return nil, status.Errorf(codes.Internal, "failed to record audit log: %v", err)
}
return &auditv1.RecordResponse{
Success: true,
}, nil
}
// Query queries audit logs based on filters.
func (s *auditServerImpl) Query(ctx context.Context, req *auditv1.QueryRequest) (*auditv1.QueryResponse, error) {
// Convert proto filters to service filters
filters := &auditLogFilters{
Limit: int(req.Limit),
Offset: int(req.Offset),
}
if req.UserId != nil {
userID := *req.UserId
filters.UserID = &userID
}
if req.Action != nil {
action := *req.Action
filters.Action = &action
}
if req.Resource != nil {
resource := *req.Resource
filters.Resource = &resource
}
if req.ResourceId != nil {
resourceID := *req.ResourceId
filters.ResourceID = &resourceID
}
if req.StartTime != nil {
startTime := *req.StartTime
filters.StartTime = &startTime
}
if req.EndTime != nil {
endTime := *req.EndTime
filters.EndTime = &endTime
}
// Query audit logs
entries, err := s.service.query(ctx, filters)
if err != nil {
s.logger.Error("Failed to query audit logs",
zap.Error(err),
)
return nil, status.Errorf(codes.Internal, "failed to query audit logs: %v", err)
}
// Convert service entries to proto entries
protoEntries := make([]*auditv1.AuditLogEntry, 0, len(entries))
for _, entry := range entries {
protoEntries = append(protoEntries, &auditv1.AuditLogEntry{
UserId: entry.UserID,
Action: entry.Action,
Resource: entry.Resource,
ResourceId: entry.ResourceID,
IpAddress: entry.IPAddress,
UserAgent: entry.UserAgent,
Metadata: entry.Metadata,
Timestamp: entry.Timestamp,
})
}
return &auditv1.QueryResponse{
Entries: protoEntries,
Total: int32(len(protoEntries)),
}, nil
}
// provideAuditService creates the audit service and gRPC server.
func provideAuditService() fx.Option {
return fx.Options(
// Audit service
fx.Provide(func(client *database.Client, log logger.Logger) (*auditService, error) {
return &auditService{
client: client,
logger: log,
}, nil
}),
// gRPC server implementation
fx.Provide(func(auditService *auditService, log logger.Logger) (*auditServerImpl, error) {
zapLogger, _ := zap.NewProduction()
return &auditServerImpl{
service: auditService,
logger: zapLogger,
}, nil
}),
// gRPC server wrapper
fx.Provide(func(
serverImpl *auditServerImpl,
cfg config.ConfigProvider,
log logger.Logger,
) (*grpcServerWrapper, error) {
port := cfg.GetInt("services.audit.port")
if port == 0 {
port = 8084
}
addr := fmt.Sprintf("0.0.0.0:%d", port)
listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %s: %w", addr, err)
}
grpcServer := grpc.NewServer()
auditv1.RegisterAuditServiceServer(grpcServer, serverImpl)
// Register health service
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
// Set serving status for the default service (empty string) - this is what Consul checks
healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
// Also set for the specific service name
healthServer.SetServingStatus("audit.v1.AuditService", grpc_health_v1.HealthCheckResponse_SERVING)
// Register reflection for grpcurl
reflection.Register(grpcServer)
return &grpcServerWrapper{
server: grpcServer,
listener: listener,
port: port,
logger: log,
}, nil
}),
)
}