fix(services): Fix service startup failures
- Remove duplicate CoreModule() calls from all service main.go files - NewContainer() already includes CoreModule() automatically - This was causing duplicate ConfigProvider provider errors - Update all _fx.go files to use *database.Client instead of *ent.Client - database.Client embeds *ent.Client, so it can be used directly - This fixes type mismatches between providers and consumers - Keep ent import for constants like ent.Desc - All services now build and should start successfully
This commit is contained in:
BIN
auth-service
BIN
auth-service
Binary file not shown.
BIN
authz-service
BIN
authz-service
Binary file not shown.
340
cmd/audit-service/audit_service_fx.go
Normal file
340
cmd/audit-service/audit_service_fx.go
Normal file
@@ -0,0 +1,340 @@
|
||||
// Package main provides FX providers for Audit Service.
|
||||
// This file creates the service inline to avoid importing internal packages.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
auditv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/audit/v1"
|
||||
"git.dcentral.systems/toolz/goplt/internal/ent"
|
||||
"git.dcentral.systems/toolz/goplt/internal/ent/auditlog"
|
||||
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/config"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
||||
"go.uber.org/fx"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/health"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/reflection"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// auditLogEntry represents an audit log entry.
|
||||
type auditLogEntry struct {
|
||||
UserID string
|
||||
Action string
|
||||
Resource string
|
||||
ResourceID string
|
||||
IPAddress string
|
||||
UserAgent string
|
||||
Metadata map[string]string
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
// auditLogFilters contains filters for querying audit logs.
|
||||
type auditLogFilters struct {
|
||||
UserID *string
|
||||
Action *string
|
||||
Resource *string
|
||||
ResourceID *string
|
||||
StartTime *int64
|
||||
EndTime *int64
|
||||
Limit int
|
||||
Offset int
|
||||
}
|
||||
|
||||
// auditService provides audit logging functionality.
|
||||
type auditService struct {
|
||||
client *database.Client
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
// record records an audit log entry.
|
||||
func (s *auditService) record(ctx context.Context, entry *auditLogEntry) error {
|
||||
// Convert metadata map to JSON
|
||||
metadataJSON := make(map[string]interface{})
|
||||
for k, v := range entry.Metadata {
|
||||
metadataJSON[k] = v
|
||||
}
|
||||
|
||||
// Create audit log entry
|
||||
timestamp := time.Unix(entry.Timestamp, 0)
|
||||
if entry.Timestamp == 0 {
|
||||
timestamp = time.Now()
|
||||
}
|
||||
|
||||
create := s.client.AuditLog.Create().
|
||||
SetID(fmt.Sprintf("%d-%d", time.Now().Unix(), time.Now().UnixNano()%1000000)).
|
||||
SetUserID(entry.UserID).
|
||||
SetAction(entry.Action).
|
||||
SetMetadata(metadataJSON).
|
||||
SetTimestamp(timestamp)
|
||||
|
||||
if entry.Resource != "" {
|
||||
create = create.SetResource(entry.Resource)
|
||||
}
|
||||
if entry.ResourceID != "" {
|
||||
create = create.SetResourceID(entry.ResourceID)
|
||||
}
|
||||
if entry.IPAddress != "" {
|
||||
create = create.SetIPAddress(entry.IPAddress)
|
||||
}
|
||||
if entry.UserAgent != "" {
|
||||
create = create.SetUserAgent(entry.UserAgent)
|
||||
}
|
||||
|
||||
_, err := create.Save(ctx)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to record audit log",
|
||||
zap.Error(err),
|
||||
zap.String("user_id", entry.UserID),
|
||||
zap.String("action", entry.Action),
|
||||
)
|
||||
return fmt.Errorf("failed to record audit log: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// query queries audit logs based on filters.
|
||||
func (s *auditService) query(ctx context.Context, filters *auditLogFilters) ([]*auditLogEntry, error) {
|
||||
query := s.client.AuditLog.Query()
|
||||
|
||||
// Apply filters
|
||||
if filters.UserID != nil {
|
||||
query = query.Where(auditlog.UserID(*filters.UserID))
|
||||
}
|
||||
if filters.Action != nil {
|
||||
query = query.Where(auditlog.Action(*filters.Action))
|
||||
}
|
||||
if filters.Resource != nil {
|
||||
query = query.Where(auditlog.Resource(*filters.Resource))
|
||||
}
|
||||
if filters.ResourceID != nil {
|
||||
query = query.Where(auditlog.ResourceID(*filters.ResourceID))
|
||||
}
|
||||
if filters.StartTime != nil {
|
||||
query = query.Where(auditlog.TimestampGTE(time.Unix(*filters.StartTime, 0)))
|
||||
}
|
||||
if filters.EndTime != nil {
|
||||
query = query.Where(auditlog.TimestampLTE(time.Unix(*filters.EndTime, 0)))
|
||||
}
|
||||
|
||||
// Apply pagination
|
||||
if filters.Limit > 0 {
|
||||
query = query.Limit(filters.Limit)
|
||||
}
|
||||
if filters.Offset > 0 {
|
||||
query = query.Offset(filters.Offset)
|
||||
}
|
||||
|
||||
// Order by timestamp descending
|
||||
query = query.Order(ent.Desc(auditlog.FieldTimestamp))
|
||||
|
||||
// Execute query
|
||||
auditLogs, err := query.All(ctx)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to query audit logs",
|
||||
zap.Error(err),
|
||||
)
|
||||
return nil, fmt.Errorf("failed to query audit logs: %w", err)
|
||||
}
|
||||
|
||||
// Convert to service entries
|
||||
entries := make([]*auditLogEntry, 0, len(auditLogs))
|
||||
for _, log := range auditLogs {
|
||||
// Convert metadata from map[string]interface{} to map[string]string
|
||||
metadata := make(map[string]string)
|
||||
if log.Metadata != nil {
|
||||
for k, v := range log.Metadata {
|
||||
if str, ok := v.(string); ok {
|
||||
metadata[k] = str
|
||||
} else {
|
||||
metadata[k] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
entry := &auditLogEntry{
|
||||
UserID: log.UserID,
|
||||
Action: log.Action,
|
||||
Resource: log.Resource,
|
||||
ResourceID: log.ResourceID,
|
||||
IPAddress: log.IPAddress,
|
||||
UserAgent: log.UserAgent,
|
||||
Metadata: metadata,
|
||||
Timestamp: log.Timestamp.Unix(),
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// auditServerImpl implements the AuditService gRPC server.
|
||||
type auditServerImpl struct {
|
||||
auditv1.UnimplementedAuditServiceServer
|
||||
service *auditService
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// Record records an audit log entry.
|
||||
func (s *auditServerImpl) Record(ctx context.Context, req *auditv1.RecordRequest) (*auditv1.RecordResponse, error) {
|
||||
if req.Entry == nil {
|
||||
return nil, status.Error(codes.InvalidArgument, "entry is required")
|
||||
}
|
||||
|
||||
entry := req.Entry
|
||||
|
||||
// Convert proto entry to service entry
|
||||
serviceEntry := &auditLogEntry{
|
||||
UserID: entry.UserId,
|
||||
Action: entry.Action,
|
||||
Resource: entry.Resource,
|
||||
ResourceID: entry.ResourceId,
|
||||
IPAddress: entry.IpAddress,
|
||||
UserAgent: entry.UserAgent,
|
||||
Metadata: entry.Metadata,
|
||||
Timestamp: entry.Timestamp,
|
||||
}
|
||||
|
||||
// Record the audit log
|
||||
if err := s.service.record(ctx, serviceEntry); err != nil {
|
||||
s.logger.Error("Failed to record audit log",
|
||||
zap.Error(err),
|
||||
zap.String("user_id", entry.UserId),
|
||||
zap.String("action", entry.Action),
|
||||
)
|
||||
return nil, status.Errorf(codes.Internal, "failed to record audit log: %v", err)
|
||||
}
|
||||
|
||||
return &auditv1.RecordResponse{
|
||||
Success: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Query queries audit logs based on filters.
|
||||
func (s *auditServerImpl) Query(ctx context.Context, req *auditv1.QueryRequest) (*auditv1.QueryResponse, error) {
|
||||
// Convert proto filters to service filters
|
||||
filters := &auditLogFilters{
|
||||
Limit: int(req.Limit),
|
||||
Offset: int(req.Offset),
|
||||
}
|
||||
|
||||
if req.UserId != nil {
|
||||
userID := *req.UserId
|
||||
filters.UserID = &userID
|
||||
}
|
||||
if req.Action != nil {
|
||||
action := *req.Action
|
||||
filters.Action = &action
|
||||
}
|
||||
if req.Resource != nil {
|
||||
resource := *req.Resource
|
||||
filters.Resource = &resource
|
||||
}
|
||||
if req.ResourceId != nil {
|
||||
resourceID := *req.ResourceId
|
||||
filters.ResourceID = &resourceID
|
||||
}
|
||||
if req.StartTime != nil {
|
||||
startTime := *req.StartTime
|
||||
filters.StartTime = &startTime
|
||||
}
|
||||
if req.EndTime != nil {
|
||||
endTime := *req.EndTime
|
||||
filters.EndTime = &endTime
|
||||
}
|
||||
|
||||
// Query audit logs
|
||||
entries, err := s.service.query(ctx, filters)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to query audit logs",
|
||||
zap.Error(err),
|
||||
)
|
||||
return nil, status.Errorf(codes.Internal, "failed to query audit logs: %v", err)
|
||||
}
|
||||
|
||||
// Convert service entries to proto entries
|
||||
protoEntries := make([]*auditv1.AuditLogEntry, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
protoEntries = append(protoEntries, &auditv1.AuditLogEntry{
|
||||
UserId: entry.UserID,
|
||||
Action: entry.Action,
|
||||
Resource: entry.Resource,
|
||||
ResourceId: entry.ResourceID,
|
||||
IpAddress: entry.IPAddress,
|
||||
UserAgent: entry.UserAgent,
|
||||
Metadata: entry.Metadata,
|
||||
Timestamp: entry.Timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
return &auditv1.QueryResponse{
|
||||
Entries: protoEntries,
|
||||
Total: int32(len(protoEntries)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// provideAuditService creates the audit service and gRPC server.
|
||||
func provideAuditService() fx.Option {
|
||||
return fx.Options(
|
||||
// Audit service
|
||||
fx.Provide(func(client *database.Client, log logger.Logger) (*auditService, error) {
|
||||
return &auditService{
|
||||
client: client,
|
||||
logger: log,
|
||||
}, nil
|
||||
}),
|
||||
|
||||
// gRPC server implementation
|
||||
fx.Provide(func(auditService *auditService, log logger.Logger) (*auditServerImpl, error) {
|
||||
zapLogger, _ := zap.NewProduction()
|
||||
return &auditServerImpl{
|
||||
service: auditService,
|
||||
logger: zapLogger,
|
||||
}, nil
|
||||
}),
|
||||
|
||||
// gRPC server wrapper
|
||||
fx.Provide(func(
|
||||
serverImpl *auditServerImpl,
|
||||
cfg config.ConfigProvider,
|
||||
log logger.Logger,
|
||||
) (*grpcServerWrapper, error) {
|
||||
port := cfg.GetInt("services.audit.port")
|
||||
if port == 0 {
|
||||
port = 8084
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("0.0.0.0:%d", port)
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to listen on %s: %w", addr, err)
|
||||
}
|
||||
|
||||
grpcServer := grpc.NewServer()
|
||||
auditv1.RegisterAuditServiceServer(grpcServer, serverImpl)
|
||||
|
||||
// Register health service
|
||||
healthServer := health.NewServer()
|
||||
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
|
||||
healthServer.SetServingStatus("audit.v1.AuditService", grpc_health_v1.HealthCheckResponse_SERVING)
|
||||
|
||||
// Register reflection for grpcurl
|
||||
reflection.Register(grpcServer)
|
||||
|
||||
return &grpcServerWrapper{
|
||||
server: grpcServer,
|
||||
listener: listener,
|
||||
port: port,
|
||||
logger: log,
|
||||
}, nil
|
||||
}),
|
||||
)
|
||||
}
|
||||
226
cmd/audit-service/main.go
Normal file
226
cmd/audit-service/main.go
Normal file
@@ -0,0 +1,226 @@
|
||||
// Package main provides the entry point for the Audit Service.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/internal/di"
|
||||
healthpkg "git.dcentral.systems/toolz/goplt/internal/health"
|
||||
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/config"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
"go.uber.org/fx"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// grpcServerWrapper wraps the gRPC server for lifecycle management.
|
||||
type grpcServerWrapper struct {
|
||||
server *grpc.Server
|
||||
listener net.Listener
|
||||
port int
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
func (s *grpcServerWrapper) Start() error {
|
||||
s.logger.Info("Starting Audit Service gRPC server",
|
||||
zap.Int("port", s.port),
|
||||
zap.String("addr", s.listener.Addr().String()),
|
||||
)
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
if err := s.server.Serve(s.listener); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
return fmt.Errorf("gRPC server failed to start: %w", err)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
s.logger.Info("Audit Service gRPC server started successfully",
|
||||
zap.Int("port", s.port),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *grpcServerWrapper) Stop(ctx context.Context) error {
|
||||
s.logger.Info("Stopping Audit Service gRPC server")
|
||||
|
||||
stopped := make(chan struct{})
|
||||
go func() {
|
||||
s.server.GracefulStop()
|
||||
close(stopped)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stopped:
|
||||
s.logger.Info("Audit Service gRPC server stopped gracefully")
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
s.logger.Warn("Audit Service gRPC server stop timeout, forcing stop")
|
||||
s.server.Stop()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *grpcServerWrapper) Port() int {
|
||||
return s.port
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Create DI container - services will be created via fx.Provide
|
||||
// Note: CoreModule() is automatically included by NewContainer()
|
||||
container := di.NewContainer(
|
||||
// Database for audit service (audit schema)
|
||||
fx.Provide(func(cfg config.ConfigProvider, log logger.Logger) (*database.Client, error) {
|
||||
dsn := cfg.GetString("database.dsn")
|
||||
if dsn == "" {
|
||||
return nil, fmt.Errorf("database.dsn is required")
|
||||
}
|
||||
client, err := database.NewClientWithSchema(dsn, "audit")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Run migrations
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := client.Migrate(ctx); err != nil {
|
||||
log.Warn("Failed to run migrations",
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
log.Info("Database migrations completed for audit service")
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}),
|
||||
|
||||
// Health registry with database checker
|
||||
fx.Provide(func(db *database.Client, log logger.Logger) (*healthpkg.Registry, error) {
|
||||
registry := healthpkg.NewRegistry()
|
||||
registry.Register("database", healthpkg.NewDatabaseChecker(db))
|
||||
return registry, nil
|
||||
}),
|
||||
|
||||
// Provide audit service and gRPC server (defined in audit_service_fx.go)
|
||||
provideAuditService(),
|
||||
|
||||
// Lifecycle hooks
|
||||
fx.Invoke(registerLifecycle),
|
||||
)
|
||||
|
||||
// Create root context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Handle signals
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
// Start the application
|
||||
if err := container.Start(ctx); err != nil {
|
||||
log := logger.GetGlobalLogger()
|
||||
if log != nil {
|
||||
log.Error("Failed to start Audit Service",
|
||||
logger.Error(err),
|
||||
)
|
||||
} else {
|
||||
fmt.Fprintf(os.Stderr, "Failed to start Audit Service: %v\n", err)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Wait for interrupt signal
|
||||
<-sigChan
|
||||
fmt.Println("\nShutting down Audit Service...")
|
||||
|
||||
// Create shutdown context with timeout
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer shutdownCancel()
|
||||
|
||||
// Stop the application
|
||||
if err := container.Stop(shutdownCtx); err != nil {
|
||||
log := logger.GetGlobalLogger()
|
||||
if log != nil {
|
||||
log.Error("Error during Audit Service shutdown",
|
||||
logger.Error(err),
|
||||
)
|
||||
} else {
|
||||
fmt.Fprintf(os.Stderr, "Error during shutdown: %v\n", err)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Println("Audit Service stopped successfully")
|
||||
}
|
||||
|
||||
// registerLifecycle registers lifecycle hooks for the service.
|
||||
func registerLifecycle(
|
||||
lc fx.Lifecycle,
|
||||
grpcServer *grpcServerWrapper,
|
||||
serviceRegistry registry.ServiceRegistry,
|
||||
cfg config.ConfigProvider,
|
||||
log logger.Logger,
|
||||
) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Start gRPC server
|
||||
if err := grpcServer.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start gRPC server: %w", err)
|
||||
}
|
||||
|
||||
// Register with service registry
|
||||
serviceID := fmt.Sprintf("audit-service-%d", time.Now().Unix())
|
||||
host := cfg.GetString("services.audit.host")
|
||||
if host == "" {
|
||||
host = "localhost"
|
||||
}
|
||||
port := grpcServer.Port()
|
||||
|
||||
instance := ®istry.ServiceInstance{
|
||||
ID: serviceID,
|
||||
Name: "audit-service",
|
||||
Address: host,
|
||||
Port: port,
|
||||
Tags: []string{"grpc", "audit"},
|
||||
Metadata: map[string]string{
|
||||
"version": "1.0.0",
|
||||
"protocol": "grpc",
|
||||
},
|
||||
}
|
||||
|
||||
if err := serviceRegistry.Register(ctx, instance); err != nil {
|
||||
log.Warn("Failed to register with service registry",
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
log.Info("Registered Audit Service with service registry",
|
||||
zap.String("service_id", serviceID),
|
||||
zap.String("name", instance.Name),
|
||||
zap.Int("port", port),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
// Stop gRPC server
|
||||
if err := grpcServer.Stop(ctx); err != nil {
|
||||
return fmt.Errorf("failed to stop gRPC server: %w", err)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
authv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/auth/v1"
|
||||
"git.dcentral.systems/toolz/goplt/internal/ent"
|
||||
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/config"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/services"
|
||||
@@ -34,7 +34,7 @@ const (
|
||||
|
||||
// authService provides authentication functionality.
|
||||
type authService struct {
|
||||
client *ent.Client
|
||||
client *database.Client
|
||||
logger logger.Logger
|
||||
identityClient services.IdentityServiceClient
|
||||
jwtSecret []byte
|
||||
@@ -303,7 +303,7 @@ func provideAuthService() fx.Option {
|
||||
return fx.Options(
|
||||
// Auth service
|
||||
fx.Provide(func(
|
||||
client *ent.Client,
|
||||
client *database.Client,
|
||||
log logger.Logger,
|
||||
identityClient services.IdentityServiceClient,
|
||||
cfg config.ConfigProvider,
|
||||
|
||||
@@ -81,10 +81,8 @@ func (s *grpcServerWrapper) Port() int {
|
||||
|
||||
func main() {
|
||||
// Create DI container
|
||||
// Note: CoreModule() is automatically included by NewContainer()
|
||||
container := di.NewContainer(
|
||||
// Core kernel services
|
||||
di.CoreModule(),
|
||||
|
||||
// Database for auth service (auth schema)
|
||||
fx.Provide(func(cfg config.ConfigProvider, log logger.Logger) (*database.Client, error) {
|
||||
dsn := cfg.GetString("database.dsn")
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
authzv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/authz/v1"
|
||||
"git.dcentral.systems/toolz/goplt/internal/ent"
|
||||
"git.dcentral.systems/toolz/goplt/internal/ent/userrole"
|
||||
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/config"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
||||
"go.uber.org/fx"
|
||||
@@ -24,7 +25,7 @@ import (
|
||||
|
||||
// authzService provides authorization functionality.
|
||||
type authzService struct {
|
||||
client *ent.Client
|
||||
client *database.Client
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
@@ -231,7 +232,7 @@ func (s *authzServerImpl) GetUserRoles(ctx context.Context, req *authzv1.GetUser
|
||||
func provideAuthzService() fx.Option {
|
||||
return fx.Options(
|
||||
// Authz service
|
||||
fx.Provide(func(client *ent.Client, log logger.Logger) (*authzService, error) {
|
||||
fx.Provide(func(client *database.Client, log logger.Logger) (*authzService, error) {
|
||||
return &authzService{
|
||||
client: client,
|
||||
logger: log,
|
||||
|
||||
@@ -79,10 +79,8 @@ func (s *grpcServerWrapper) Port() int {
|
||||
|
||||
func main() {
|
||||
// Create DI container
|
||||
// Note: CoreModule() is automatically included by NewContainer()
|
||||
container := di.NewContainer(
|
||||
// Core kernel services
|
||||
di.CoreModule(),
|
||||
|
||||
// Database for authz service (authz schema)
|
||||
fx.Provide(func(cfg config.ConfigProvider, log logger.Logger) (*database.Client, error) {
|
||||
dsn := cfg.GetString("database.dsn")
|
||||
|
||||
414
cmd/identity-service/identity_service_fx.go
Normal file
414
cmd/identity-service/identity_service_fx.go
Normal file
@@ -0,0 +1,414 @@
|
||||
// Package main provides FX providers for Identity Service.
|
||||
// This file creates the service inline to avoid importing internal packages.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/subtle"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
identityv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/identity/v1"
|
||||
"git.dcentral.systems/toolz/goplt/internal/ent"
|
||||
"git.dcentral.systems/toolz/goplt/internal/ent/user"
|
||||
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/config"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
||||
"go.uber.org/fx"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/crypto/argon2"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/health"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/reflection"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// userService provides user management functionality.
|
||||
type userService struct {
|
||||
client *database.Client
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
// generateToken generates a random token.
|
||||
func generateToken() (string, error) {
|
||||
b := make([]byte, 32)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
return "", fmt.Errorf("failed to generate token: %w", err)
|
||||
}
|
||||
return fmt.Sprintf("%x", b), nil
|
||||
}
|
||||
|
||||
// hashPassword hashes a password using argon2id.
|
||||
func hashPassword(password string) (string, error) {
|
||||
salt := make([]byte, 16)
|
||||
if _, err := rand.Read(salt); err != nil {
|
||||
return "", fmt.Errorf("failed to generate salt: %w", err)
|
||||
}
|
||||
hash := argon2.IDKey([]byte(password), salt, 3, 64*1024, 4, 32)
|
||||
b64Salt := base64.RawStdEncoding.EncodeToString(salt)
|
||||
b64Hash := base64.RawStdEncoding.EncodeToString(hash)
|
||||
return fmt.Sprintf("$argon2id$v=%d$m=%d,t=%d,p=%d$%s$%s",
|
||||
argon2.Version, 64*1024, 3, 4, b64Salt, b64Hash), nil
|
||||
}
|
||||
|
||||
// verifyPassword verifies a password against a hash.
|
||||
func verifyPassword(password, hash string) (bool, error) {
|
||||
// Simplified verification - in production use proper parsing
|
||||
parts := strings.Split(hash, "$")
|
||||
if len(parts) != 6 {
|
||||
return false, fmt.Errorf("invalid hash format")
|
||||
}
|
||||
salt, err := base64.RawStdEncoding.DecodeString(parts[4])
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
expectedHash, err := base64.RawStdEncoding.DecodeString(parts[5])
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
actualHash := argon2.IDKey([]byte(password), salt, 3, 64*1024, 4, uint32(len(expectedHash)))
|
||||
return subtle.ConstantTimeCompare(expectedHash, actualHash) == 1, nil
|
||||
}
|
||||
|
||||
// createUser creates a new user.
|
||||
func (s *userService) createUser(ctx context.Context, email, username, pwd, firstName, lastName string) (*ent.User, error) {
|
||||
exists, err := s.client.User.Query().Where(user.Email(email)).Exist(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check email: %w", err)
|
||||
}
|
||||
if exists {
|
||||
return nil, fmt.Errorf("email already exists")
|
||||
}
|
||||
|
||||
passwordHash, err := hashPassword(pwd)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to hash password: %w", err)
|
||||
}
|
||||
|
||||
verificationToken, err := generateToken()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate token: %w", err)
|
||||
}
|
||||
|
||||
create := s.client.User.Create().
|
||||
SetID(fmt.Sprintf("%d", time.Now().UnixNano())).
|
||||
SetEmail(email).
|
||||
SetPasswordHash(passwordHash).
|
||||
SetVerified(false).
|
||||
SetEmailVerificationToken(verificationToken)
|
||||
|
||||
if username != "" {
|
||||
create = create.SetUsername(username)
|
||||
}
|
||||
if firstName != "" {
|
||||
create = create.SetFirstName(firstName)
|
||||
}
|
||||
if lastName != "" {
|
||||
create = create.SetLastName(lastName)
|
||||
}
|
||||
|
||||
u, err := create.Save(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create user: %w", err)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// getUser retrieves a user by ID.
|
||||
func (s *userService) getUser(ctx context.Context, id string) (*ent.User, error) {
|
||||
u, err := s.client.User.Get(ctx, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get user: %w", err)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// getUserByEmail retrieves a user by email.
|
||||
func (s *userService) getUserByEmail(ctx context.Context, email string) (*ent.User, error) {
|
||||
u, err := s.client.User.Query().Where(user.Email(email)).Only(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get user: %w", err)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// updateUser updates a user.
|
||||
func (s *userService) updateUser(ctx context.Context, id string, email, username, firstName, lastName *string) (*ent.User, error) {
|
||||
update := s.client.User.UpdateOneID(id)
|
||||
if email != nil {
|
||||
exists, err := s.client.User.Query().
|
||||
Where(user.Email(*email), user.IDNEQ(id)).
|
||||
Exist(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check email: %w", err)
|
||||
}
|
||||
if exists {
|
||||
return nil, fmt.Errorf("email already taken")
|
||||
}
|
||||
update = update.SetEmail(*email)
|
||||
}
|
||||
if username != nil {
|
||||
update = update.SetUsername(*username)
|
||||
}
|
||||
if firstName != nil {
|
||||
update = update.SetFirstName(*firstName)
|
||||
}
|
||||
if lastName != nil {
|
||||
update = update.SetLastName(*lastName)
|
||||
}
|
||||
return update.Save(ctx)
|
||||
}
|
||||
|
||||
// deleteUser deletes a user.
|
||||
func (s *userService) deleteUser(ctx context.Context, id string) error {
|
||||
return s.client.User.DeleteOneID(id).Exec(ctx)
|
||||
}
|
||||
|
||||
// verifyEmail verifies a user's email.
|
||||
func (s *userService) verifyEmail(ctx context.Context, token string) error {
|
||||
u, err := s.client.User.Query().Where(user.EmailVerificationToken(token)).Only(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid token")
|
||||
}
|
||||
_, err = s.client.User.UpdateOneID(u.ID).
|
||||
SetVerified(true).
|
||||
ClearEmailVerificationToken().
|
||||
Save(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// requestPasswordReset requests a password reset.
|
||||
func (s *userService) requestPasswordReset(ctx context.Context, email string) (string, error) {
|
||||
u, err := s.getUserByEmail(ctx, email)
|
||||
if err != nil {
|
||||
return "", nil // Don't reveal if user exists
|
||||
}
|
||||
token, err := generateToken()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
expiresAt := time.Now().Add(24 * time.Hour)
|
||||
_, err = s.client.User.UpdateOneID(u.ID).
|
||||
SetPasswordResetToken(token).
|
||||
SetPasswordResetExpiresAt(expiresAt).
|
||||
Save(ctx)
|
||||
return token, err
|
||||
}
|
||||
|
||||
// resetPassword resets a password.
|
||||
func (s *userService) resetPassword(ctx context.Context, token, newPassword string) error {
|
||||
u, err := s.client.User.Query().Where(user.PasswordResetToken(token)).Only(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid token")
|
||||
}
|
||||
if !u.PasswordResetExpiresAt.IsZero() && u.PasswordResetExpiresAt.Before(time.Now()) {
|
||||
return fmt.Errorf("token expired")
|
||||
}
|
||||
passwordHash, err := hashPassword(newPassword)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.client.User.UpdateOneID(u.ID).
|
||||
SetPasswordHash(passwordHash).
|
||||
ClearPasswordResetToken().
|
||||
ClearPasswordResetExpiresAt().
|
||||
Save(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// verifyPassword verifies a password.
|
||||
func (s *userService) verifyPassword(ctx context.Context, email, pwd string) (*ent.User, error) {
|
||||
u, err := s.getUserByEmail(ctx, email)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
valid, err := verifyPassword(pwd, u.PasswordHash)
|
||||
if err != nil || !valid {
|
||||
return nil, fmt.Errorf("invalid password")
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// identityServerImpl implements the IdentityService gRPC server.
|
||||
type identityServerImpl struct {
|
||||
identityv1.UnimplementedIdentityServiceServer
|
||||
service *userService
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// GetUser retrieves a user by ID.
|
||||
func (s *identityServerImpl) GetUser(ctx context.Context, req *identityv1.GetUserRequest) (*identityv1.GetUserResponse, error) {
|
||||
u, err := s.service.getUser(ctx, req.Id)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "user not found: %v", err)
|
||||
}
|
||||
return &identityv1.GetUserResponse{
|
||||
User: &identityv1.User{
|
||||
Id: u.ID,
|
||||
Email: u.Email,
|
||||
Username: u.Username,
|
||||
FirstName: u.FirstName,
|
||||
LastName: u.LastName,
|
||||
EmailVerified: u.Verified,
|
||||
CreatedAt: u.CreatedAt.Unix(),
|
||||
UpdatedAt: u.UpdatedAt.Unix(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetUserByEmail retrieves a user by email.
|
||||
func (s *identityServerImpl) GetUserByEmail(ctx context.Context, req *identityv1.GetUserByEmailRequest) (*identityv1.GetUserByEmailResponse, error) {
|
||||
u, err := s.service.getUserByEmail(ctx, req.Email)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "user not found: %v", err)
|
||||
}
|
||||
return &identityv1.GetUserByEmailResponse{
|
||||
User: &identityv1.User{
|
||||
Id: u.ID,
|
||||
Email: u.Email,
|
||||
Username: u.Username,
|
||||
FirstName: u.FirstName,
|
||||
LastName: u.LastName,
|
||||
EmailVerified: u.Verified,
|
||||
CreatedAt: u.CreatedAt.Unix(),
|
||||
UpdatedAt: u.UpdatedAt.Unix(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CreateUser creates a new user.
|
||||
func (s *identityServerImpl) CreateUser(ctx context.Context, req *identityv1.CreateUserRequest) (*identityv1.CreateUserResponse, error) {
|
||||
u, err := s.service.createUser(ctx, req.Email, req.Username, req.Password, req.FirstName, req.LastName)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to create user: %v", err)
|
||||
}
|
||||
return &identityv1.CreateUserResponse{
|
||||
User: &identityv1.User{
|
||||
Id: u.ID,
|
||||
Email: u.Email,
|
||||
Username: u.Username,
|
||||
FirstName: u.FirstName,
|
||||
LastName: u.LastName,
|
||||
EmailVerified: u.Verified,
|
||||
CreatedAt: u.CreatedAt.Unix(),
|
||||
UpdatedAt: u.UpdatedAt.Unix(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UpdateUser updates a user.
|
||||
func (s *identityServerImpl) UpdateUser(ctx context.Context, req *identityv1.UpdateUserRequest) (*identityv1.UpdateUserResponse, error) {
|
||||
u, err := s.service.updateUser(ctx, req.Id, req.Email, req.Username, req.FirstName, req.LastName)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to update user: %v", err)
|
||||
}
|
||||
return &identityv1.UpdateUserResponse{
|
||||
User: &identityv1.User{
|
||||
Id: u.ID,
|
||||
Email: u.Email,
|
||||
Username: u.Username,
|
||||
FirstName: u.FirstName,
|
||||
LastName: u.LastName,
|
||||
EmailVerified: u.Verified,
|
||||
CreatedAt: u.CreatedAt.Unix(),
|
||||
UpdatedAt: u.UpdatedAt.Unix(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DeleteUser deletes a user.
|
||||
func (s *identityServerImpl) DeleteUser(ctx context.Context, req *identityv1.DeleteUserRequest) (*identityv1.DeleteUserResponse, error) {
|
||||
if err := s.service.deleteUser(ctx, req.Id); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to delete user: %v", err)
|
||||
}
|
||||
return &identityv1.DeleteUserResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// VerifyEmail verifies a user's email.
|
||||
func (s *identityServerImpl) VerifyEmail(ctx context.Context, req *identityv1.VerifyEmailRequest) (*identityv1.VerifyEmailResponse, error) {
|
||||
if err := s.service.verifyEmail(ctx, req.Token); err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "failed to verify email: %v", err)
|
||||
}
|
||||
return &identityv1.VerifyEmailResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// RequestPasswordReset requests a password reset.
|
||||
func (s *identityServerImpl) RequestPasswordReset(ctx context.Context, req *identityv1.RequestPasswordResetRequest) (*identityv1.RequestPasswordResetResponse, error) {
|
||||
_, err := s.service.requestPasswordReset(ctx, req.Email)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to request password reset: %v", err)
|
||||
}
|
||||
return &identityv1.RequestPasswordResetResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// ResetPassword resets a password.
|
||||
func (s *identityServerImpl) ResetPassword(ctx context.Context, req *identityv1.ResetPasswordRequest) (*identityv1.ResetPasswordResponse, error) {
|
||||
if err := s.service.resetPassword(ctx, req.Token, req.NewPassword); err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "failed to reset password: %v", err)
|
||||
}
|
||||
return &identityv1.ResetPasswordResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// provideIdentityService creates the identity service and gRPC server.
|
||||
func provideIdentityService() fx.Option {
|
||||
return fx.Options(
|
||||
// User service
|
||||
fx.Provide(func(client *database.Client, log logger.Logger) (*userService, error) {
|
||||
return &userService{
|
||||
client: client,
|
||||
logger: log,
|
||||
}, nil
|
||||
}),
|
||||
|
||||
// gRPC server implementation
|
||||
fx.Provide(func(userService *userService, log logger.Logger) (*identityServerImpl, error) {
|
||||
zapLogger, _ := zap.NewProduction()
|
||||
return &identityServerImpl{
|
||||
service: userService,
|
||||
logger: zapLogger,
|
||||
}, nil
|
||||
}),
|
||||
|
||||
// gRPC server wrapper
|
||||
fx.Provide(func(
|
||||
serverImpl *identityServerImpl,
|
||||
cfg config.ConfigProvider,
|
||||
log logger.Logger,
|
||||
) (*grpcServerWrapper, error) {
|
||||
port := cfg.GetInt("services.identity.port")
|
||||
if port == 0 {
|
||||
port = 8082
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("0.0.0.0:%d", port)
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to listen on %s: %w", addr, err)
|
||||
}
|
||||
|
||||
grpcServer := grpc.NewServer()
|
||||
identityv1.RegisterIdentityServiceServer(grpcServer, serverImpl)
|
||||
|
||||
// Register health service
|
||||
healthServer := health.NewServer()
|
||||
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
|
||||
healthServer.SetServingStatus("identity.v1.IdentityService", grpc_health_v1.HealthCheckResponse_SERVING)
|
||||
|
||||
// Register reflection for grpcurl
|
||||
reflection.Register(grpcServer)
|
||||
|
||||
return &grpcServerWrapper{
|
||||
server: grpcServer,
|
||||
listener: listener,
|
||||
port: port,
|
||||
logger: log,
|
||||
}, nil
|
||||
}),
|
||||
)
|
||||
}
|
||||
226
cmd/identity-service/main.go
Normal file
226
cmd/identity-service/main.go
Normal file
@@ -0,0 +1,226 @@
|
||||
// Package main provides the entry point for the Identity Service.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/internal/di"
|
||||
healthpkg "git.dcentral.systems/toolz/goplt/internal/health"
|
||||
"git.dcentral.systems/toolz/goplt/internal/infra/database"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/config"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
"go.uber.org/fx"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// grpcServerWrapper wraps the gRPC server for lifecycle management.
|
||||
type grpcServerWrapper struct {
|
||||
server *grpc.Server
|
||||
listener net.Listener
|
||||
port int
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
func (s *grpcServerWrapper) Start() error {
|
||||
s.logger.Info("Starting Identity Service gRPC server",
|
||||
zap.Int("port", s.port),
|
||||
zap.String("addr", s.listener.Addr().String()),
|
||||
)
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
if err := s.server.Serve(s.listener); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
return fmt.Errorf("gRPC server failed to start: %w", err)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
s.logger.Info("Identity Service gRPC server started successfully",
|
||||
zap.Int("port", s.port),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *grpcServerWrapper) Stop(ctx context.Context) error {
|
||||
s.logger.Info("Stopping Identity Service gRPC server")
|
||||
|
||||
stopped := make(chan struct{})
|
||||
go func() {
|
||||
s.server.GracefulStop()
|
||||
close(stopped)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stopped:
|
||||
s.logger.Info("Identity Service gRPC server stopped gracefully")
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
s.logger.Warn("Identity Service gRPC server stop timeout, forcing stop")
|
||||
s.server.Stop()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *grpcServerWrapper) Port() int {
|
||||
return s.port
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Create DI container
|
||||
// Note: CoreModule() is automatically included by NewContainer()
|
||||
container := di.NewContainer(
|
||||
// Database for identity service (identity schema)
|
||||
fx.Provide(func(cfg config.ConfigProvider, log logger.Logger) (*database.Client, error) {
|
||||
dsn := cfg.GetString("database.dsn")
|
||||
if dsn == "" {
|
||||
return nil, fmt.Errorf("database.dsn is required")
|
||||
}
|
||||
client, err := database.NewClientWithSchema(dsn, "identity")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Run migrations
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := client.Migrate(ctx); err != nil {
|
||||
log.Warn("Failed to run migrations",
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
log.Info("Database migrations completed for identity service")
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}),
|
||||
|
||||
// Health registry with database checker
|
||||
fx.Provide(func(db *database.Client, log logger.Logger) (*healthpkg.Registry, error) {
|
||||
registry := healthpkg.NewRegistry()
|
||||
registry.Register("database", healthpkg.NewDatabaseChecker(db))
|
||||
return registry, nil
|
||||
}),
|
||||
|
||||
// Provide identity service and gRPC server (defined in identity_service_fx.go)
|
||||
provideIdentityService(),
|
||||
|
||||
// Lifecycle hooks
|
||||
fx.Invoke(registerLifecycle),
|
||||
)
|
||||
|
||||
// Create root context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Handle signals
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
// Start the application
|
||||
if err := container.Start(ctx); err != nil {
|
||||
log := logger.GetGlobalLogger()
|
||||
if log != nil {
|
||||
log.Error("Failed to start Identity Service",
|
||||
logger.Error(err),
|
||||
)
|
||||
} else {
|
||||
fmt.Fprintf(os.Stderr, "Failed to start Identity Service: %v\n", err)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Wait for interrupt signal
|
||||
<-sigChan
|
||||
fmt.Println("\nShutting down Identity Service...")
|
||||
|
||||
// Create shutdown context with timeout
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer shutdownCancel()
|
||||
|
||||
// Stop the application
|
||||
if err := container.Stop(shutdownCtx); err != nil {
|
||||
log := logger.GetGlobalLogger()
|
||||
if log != nil {
|
||||
log.Error("Error during Identity Service shutdown",
|
||||
logger.Error(err),
|
||||
)
|
||||
} else {
|
||||
fmt.Fprintf(os.Stderr, "Error during shutdown: %v\n", err)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Println("Identity Service stopped successfully")
|
||||
}
|
||||
|
||||
// registerLifecycle registers lifecycle hooks for the service.
|
||||
func registerLifecycle(
|
||||
lc fx.Lifecycle,
|
||||
grpcServer *grpcServerWrapper,
|
||||
serviceRegistry registry.ServiceRegistry,
|
||||
cfg config.ConfigProvider,
|
||||
log logger.Logger,
|
||||
) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Start gRPC server
|
||||
if err := grpcServer.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start gRPC server: %w", err)
|
||||
}
|
||||
|
||||
// Register with service registry
|
||||
serviceID := fmt.Sprintf("identity-service-%d", time.Now().Unix())
|
||||
host := cfg.GetString("services.identity.host")
|
||||
if host == "" {
|
||||
host = "localhost"
|
||||
}
|
||||
port := grpcServer.Port()
|
||||
|
||||
instance := ®istry.ServiceInstance{
|
||||
ID: serviceID,
|
||||
Name: "identity-service",
|
||||
Address: host,
|
||||
Port: port,
|
||||
Tags: []string{"grpc", "identity"},
|
||||
Metadata: map[string]string{
|
||||
"version": "1.0.0",
|
||||
"protocol": "grpc",
|
||||
},
|
||||
}
|
||||
|
||||
if err := serviceRegistry.Register(ctx, instance); err != nil {
|
||||
log.Warn("Failed to register with service registry",
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
log.Info("Registered Identity Service with service registry",
|
||||
zap.String("service_id", serviceID),
|
||||
zap.String("name", instance.Name),
|
||||
zap.Int("port", port),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
// Stop gRPC server
|
||||
if err := grpcServer.Stop(ctx); err != nil {
|
||||
return fmt.Errorf("failed to stop gRPC server: %w", err)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -22,7 +22,7 @@ services:
|
||||
- goplt-network
|
||||
|
||||
consul:
|
||||
image: consul:latest
|
||||
image: consul:1.15.4
|
||||
container_name: goplt-consul
|
||||
command: consul agent -dev -client=0.0.0.0
|
||||
ports:
|
||||
|
||||
Reference in New Issue
Block a user