344 lines
8.9 KiB
Go
344 lines
8.9 KiB
Go
// Package main provides FX providers for Audit Service.
|
|
// This file creates the service inline to avoid importing internal packages.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
auditv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/audit/v1"
|
|
"git.dcentral.systems/toolz/goplt/internal/ent"
|
|
"git.dcentral.systems/toolz/goplt/internal/ent/auditlog"
|
|
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
|
"git.dcentral.systems/toolz/goplt/pkg/config"
|
|
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
|
"go.uber.org/fx"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/health"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
"google.golang.org/grpc/reflection"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// auditLogEntry represents an audit log entry.
|
|
type auditLogEntry struct {
|
|
UserID string
|
|
Action string
|
|
Resource string
|
|
ResourceID string
|
|
IPAddress string
|
|
UserAgent string
|
|
Metadata map[string]string
|
|
Timestamp int64
|
|
}
|
|
|
|
// auditLogFilters contains filters for querying audit logs.
|
|
type auditLogFilters struct {
|
|
UserID *string
|
|
Action *string
|
|
Resource *string
|
|
ResourceID *string
|
|
StartTime *int64
|
|
EndTime *int64
|
|
Limit int
|
|
Offset int
|
|
}
|
|
|
|
// auditService provides audit logging functionality.
|
|
type auditService struct {
|
|
client *database.Client
|
|
logger logger.Logger
|
|
}
|
|
|
|
// record records an audit log entry.
|
|
func (s *auditService) record(ctx context.Context, entry *auditLogEntry) error {
|
|
// Convert metadata map to JSON
|
|
metadataJSON := make(map[string]interface{})
|
|
for k, v := range entry.Metadata {
|
|
metadataJSON[k] = v
|
|
}
|
|
|
|
// Create audit log entry
|
|
timestamp := time.Unix(entry.Timestamp, 0)
|
|
if entry.Timestamp == 0 {
|
|
timestamp = time.Now()
|
|
}
|
|
|
|
create := s.client.AuditLog.Create().
|
|
SetID(fmt.Sprintf("%d-%d", time.Now().Unix(), time.Now().UnixNano()%1000000)).
|
|
SetUserID(entry.UserID).
|
|
SetAction(entry.Action).
|
|
SetMetadata(metadataJSON).
|
|
SetTimestamp(timestamp)
|
|
|
|
if entry.Resource != "" {
|
|
create = create.SetResource(entry.Resource)
|
|
}
|
|
if entry.ResourceID != "" {
|
|
create = create.SetResourceID(entry.ResourceID)
|
|
}
|
|
if entry.IPAddress != "" {
|
|
create = create.SetIPAddress(entry.IPAddress)
|
|
}
|
|
if entry.UserAgent != "" {
|
|
create = create.SetUserAgent(entry.UserAgent)
|
|
}
|
|
|
|
_, err := create.Save(ctx)
|
|
if err != nil {
|
|
s.logger.Error("Failed to record audit log",
|
|
zap.Error(err),
|
|
zap.String("user_id", entry.UserID),
|
|
zap.String("action", entry.Action),
|
|
)
|
|
return fmt.Errorf("failed to record audit log: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// query queries audit logs based on filters.
|
|
func (s *auditService) query(ctx context.Context, filters *auditLogFilters) ([]*auditLogEntry, error) {
|
|
query := s.client.AuditLog.Query()
|
|
|
|
// Apply filters
|
|
if filters.UserID != nil {
|
|
query = query.Where(auditlog.UserID(*filters.UserID))
|
|
}
|
|
if filters.Action != nil {
|
|
query = query.Where(auditlog.Action(*filters.Action))
|
|
}
|
|
if filters.Resource != nil {
|
|
query = query.Where(auditlog.Resource(*filters.Resource))
|
|
}
|
|
if filters.ResourceID != nil {
|
|
query = query.Where(auditlog.ResourceID(*filters.ResourceID))
|
|
}
|
|
if filters.StartTime != nil {
|
|
query = query.Where(auditlog.TimestampGTE(time.Unix(*filters.StartTime, 0)))
|
|
}
|
|
if filters.EndTime != nil {
|
|
query = query.Where(auditlog.TimestampLTE(time.Unix(*filters.EndTime, 0)))
|
|
}
|
|
|
|
// Apply pagination
|
|
if filters.Limit > 0 {
|
|
query = query.Limit(filters.Limit)
|
|
}
|
|
if filters.Offset > 0 {
|
|
query = query.Offset(filters.Offset)
|
|
}
|
|
|
|
// Order by timestamp descending
|
|
query = query.Order(ent.Desc(auditlog.FieldTimestamp))
|
|
|
|
// Execute query
|
|
auditLogs, err := query.All(ctx)
|
|
if err != nil {
|
|
s.logger.Error("Failed to query audit logs",
|
|
zap.Error(err),
|
|
)
|
|
return nil, fmt.Errorf("failed to query audit logs: %w", err)
|
|
}
|
|
|
|
// Convert to service entries
|
|
entries := make([]*auditLogEntry, 0, len(auditLogs))
|
|
for _, log := range auditLogs {
|
|
// Convert metadata from map[string]interface{} to map[string]string
|
|
metadata := make(map[string]string)
|
|
if log.Metadata != nil {
|
|
for k, v := range log.Metadata {
|
|
if str, ok := v.(string); ok {
|
|
metadata[k] = str
|
|
} else {
|
|
metadata[k] = fmt.Sprintf("%v", v)
|
|
}
|
|
}
|
|
}
|
|
|
|
entry := &auditLogEntry{
|
|
UserID: log.UserID,
|
|
Action: log.Action,
|
|
Resource: log.Resource,
|
|
ResourceID: log.ResourceID,
|
|
IPAddress: log.IPAddress,
|
|
UserAgent: log.UserAgent,
|
|
Metadata: metadata,
|
|
Timestamp: log.Timestamp.Unix(),
|
|
}
|
|
entries = append(entries, entry)
|
|
}
|
|
|
|
return entries, nil
|
|
}
|
|
|
|
// auditServerImpl implements the AuditService gRPC server.
|
|
type auditServerImpl struct {
|
|
auditv1.UnimplementedAuditServiceServer
|
|
service *auditService
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// Record records an audit log entry.
|
|
func (s *auditServerImpl) Record(ctx context.Context, req *auditv1.RecordRequest) (*auditv1.RecordResponse, error) {
|
|
if req.Entry == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "entry is required")
|
|
}
|
|
|
|
entry := req.Entry
|
|
|
|
// Convert proto entry to service entry
|
|
serviceEntry := &auditLogEntry{
|
|
UserID: entry.UserId,
|
|
Action: entry.Action,
|
|
Resource: entry.Resource,
|
|
ResourceID: entry.ResourceId,
|
|
IPAddress: entry.IpAddress,
|
|
UserAgent: entry.UserAgent,
|
|
Metadata: entry.Metadata,
|
|
Timestamp: entry.Timestamp,
|
|
}
|
|
|
|
// Record the audit log
|
|
if err := s.service.record(ctx, serviceEntry); err != nil {
|
|
s.logger.Error("Failed to record audit log",
|
|
zap.Error(err),
|
|
zap.String("user_id", entry.UserId),
|
|
zap.String("action", entry.Action),
|
|
)
|
|
return nil, status.Errorf(codes.Internal, "failed to record audit log: %v", err)
|
|
}
|
|
|
|
return &auditv1.RecordResponse{
|
|
Success: true,
|
|
}, nil
|
|
}
|
|
|
|
// Query queries audit logs based on filters.
|
|
func (s *auditServerImpl) Query(ctx context.Context, req *auditv1.QueryRequest) (*auditv1.QueryResponse, error) {
|
|
// Convert proto filters to service filters
|
|
filters := &auditLogFilters{
|
|
Limit: int(req.Limit),
|
|
Offset: int(req.Offset),
|
|
}
|
|
|
|
if req.UserId != nil {
|
|
userID := *req.UserId
|
|
filters.UserID = &userID
|
|
}
|
|
if req.Action != nil {
|
|
action := *req.Action
|
|
filters.Action = &action
|
|
}
|
|
if req.Resource != nil {
|
|
resource := *req.Resource
|
|
filters.Resource = &resource
|
|
}
|
|
if req.ResourceId != nil {
|
|
resourceID := *req.ResourceId
|
|
filters.ResourceID = &resourceID
|
|
}
|
|
if req.StartTime != nil {
|
|
startTime := *req.StartTime
|
|
filters.StartTime = &startTime
|
|
}
|
|
if req.EndTime != nil {
|
|
endTime := *req.EndTime
|
|
filters.EndTime = &endTime
|
|
}
|
|
|
|
// Query audit logs
|
|
entries, err := s.service.query(ctx, filters)
|
|
if err != nil {
|
|
s.logger.Error("Failed to query audit logs",
|
|
zap.Error(err),
|
|
)
|
|
return nil, status.Errorf(codes.Internal, "failed to query audit logs: %v", err)
|
|
}
|
|
|
|
// Convert service entries to proto entries
|
|
protoEntries := make([]*auditv1.AuditLogEntry, 0, len(entries))
|
|
for _, entry := range entries {
|
|
protoEntries = append(protoEntries, &auditv1.AuditLogEntry{
|
|
UserId: entry.UserID,
|
|
Action: entry.Action,
|
|
Resource: entry.Resource,
|
|
ResourceId: entry.ResourceID,
|
|
IpAddress: entry.IPAddress,
|
|
UserAgent: entry.UserAgent,
|
|
Metadata: entry.Metadata,
|
|
Timestamp: entry.Timestamp,
|
|
})
|
|
}
|
|
|
|
return &auditv1.QueryResponse{
|
|
Entries: protoEntries,
|
|
Total: int32(len(protoEntries)),
|
|
}, nil
|
|
}
|
|
|
|
// provideAuditService creates the audit service and gRPC server.
|
|
func provideAuditService() fx.Option {
|
|
return fx.Options(
|
|
// Audit service
|
|
fx.Provide(func(client *database.Client, log logger.Logger) (*auditService, error) {
|
|
return &auditService{
|
|
client: client,
|
|
logger: log,
|
|
}, nil
|
|
}),
|
|
|
|
// gRPC server implementation
|
|
fx.Provide(func(auditService *auditService, log logger.Logger) (*auditServerImpl, error) {
|
|
zapLogger, _ := zap.NewProduction()
|
|
return &auditServerImpl{
|
|
service: auditService,
|
|
logger: zapLogger,
|
|
}, nil
|
|
}),
|
|
|
|
// gRPC server wrapper
|
|
fx.Provide(func(
|
|
serverImpl *auditServerImpl,
|
|
cfg config.ConfigProvider,
|
|
log logger.Logger,
|
|
) (*grpcServerWrapper, error) {
|
|
port := cfg.GetInt("services.audit.port")
|
|
if port == 0 {
|
|
port = 8084
|
|
}
|
|
|
|
addr := fmt.Sprintf("0.0.0.0:%d", port)
|
|
listener, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to listen on %s: %w", addr, err)
|
|
}
|
|
|
|
grpcServer := grpc.NewServer()
|
|
auditv1.RegisterAuditServiceServer(grpcServer, serverImpl)
|
|
|
|
// Register health service
|
|
healthServer := health.NewServer()
|
|
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
|
|
// Set serving status for the default service (empty string) - this is what Consul checks
|
|
healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
|
|
// Also set for the specific service name
|
|
healthServer.SetServingStatus("audit.v1.AuditService", grpc_health_v1.HealthCheckResponse_SERVING)
|
|
|
|
// Register reflection for grpcurl
|
|
reflection.Register(grpcServer)
|
|
|
|
return &grpcServerWrapper{
|
|
server: grpcServer,
|
|
listener: listener,
|
|
port: port,
|
|
logger: log,
|
|
}, nil
|
|
}),
|
|
)
|
|
}
|