diff --git a/auth-service b/auth-service index 0f9342c..7b49096 100755 Binary files a/auth-service and b/auth-service differ diff --git a/authz-service b/authz-service index a64adde..9159ccb 100755 Binary files a/authz-service and b/authz-service differ diff --git a/cmd/audit-service/audit_service_fx.go b/cmd/audit-service/audit_service_fx.go new file mode 100644 index 0000000..fd982d9 --- /dev/null +++ b/cmd/audit-service/audit_service_fx.go @@ -0,0 +1,340 @@ +// Package main provides FX providers for Audit Service. +// This file creates the service inline to avoid importing internal packages. +package main + +import ( + "context" + "fmt" + "net" + "time" + + auditv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/audit/v1" + "git.dcentral.systems/toolz/goplt/internal/ent" + "git.dcentral.systems/toolz/goplt/internal/ent/auditlog" + "git.dcentral.systems/toolz/goplt/internal/infra/database" + "git.dcentral.systems/toolz/goplt/pkg/config" + "git.dcentral.systems/toolz/goplt/pkg/logger" + "go.uber.org/fx" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" +) + +// auditLogEntry represents an audit log entry. +type auditLogEntry struct { + UserID string + Action string + Resource string + ResourceID string + IPAddress string + UserAgent string + Metadata map[string]string + Timestamp int64 +} + +// auditLogFilters contains filters for querying audit logs. +type auditLogFilters struct { + UserID *string + Action *string + Resource *string + ResourceID *string + StartTime *int64 + EndTime *int64 + Limit int + Offset int +} + +// auditService provides audit logging functionality. +type auditService struct { + client *database.Client + logger logger.Logger +} + +// record records an audit log entry. +func (s *auditService) record(ctx context.Context, entry *auditLogEntry) error { + // Convert metadata map to JSON + metadataJSON := make(map[string]interface{}) + for k, v := range entry.Metadata { + metadataJSON[k] = v + } + + // Create audit log entry + timestamp := time.Unix(entry.Timestamp, 0) + if entry.Timestamp == 0 { + timestamp = time.Now() + } + + create := s.client.AuditLog.Create(). + SetID(fmt.Sprintf("%d-%d", time.Now().Unix(), time.Now().UnixNano()%1000000)). + SetUserID(entry.UserID). + SetAction(entry.Action). + SetMetadata(metadataJSON). + SetTimestamp(timestamp) + + if entry.Resource != "" { + create = create.SetResource(entry.Resource) + } + if entry.ResourceID != "" { + create = create.SetResourceID(entry.ResourceID) + } + if entry.IPAddress != "" { + create = create.SetIPAddress(entry.IPAddress) + } + if entry.UserAgent != "" { + create = create.SetUserAgent(entry.UserAgent) + } + + _, err := create.Save(ctx) + if err != nil { + s.logger.Error("Failed to record audit log", + zap.Error(err), + zap.String("user_id", entry.UserID), + zap.String("action", entry.Action), + ) + return fmt.Errorf("failed to record audit log: %w", err) + } + + return nil +} + +// query queries audit logs based on filters. +func (s *auditService) query(ctx context.Context, filters *auditLogFilters) ([]*auditLogEntry, error) { + query := s.client.AuditLog.Query() + + // Apply filters + if filters.UserID != nil { + query = query.Where(auditlog.UserID(*filters.UserID)) + } + if filters.Action != nil { + query = query.Where(auditlog.Action(*filters.Action)) + } + if filters.Resource != nil { + query = query.Where(auditlog.Resource(*filters.Resource)) + } + if filters.ResourceID != nil { + query = query.Where(auditlog.ResourceID(*filters.ResourceID)) + } + if filters.StartTime != nil { + query = query.Where(auditlog.TimestampGTE(time.Unix(*filters.StartTime, 0))) + } + if filters.EndTime != nil { + query = query.Where(auditlog.TimestampLTE(time.Unix(*filters.EndTime, 0))) + } + + // Apply pagination + if filters.Limit > 0 { + query = query.Limit(filters.Limit) + } + if filters.Offset > 0 { + query = query.Offset(filters.Offset) + } + + // Order by timestamp descending + query = query.Order(ent.Desc(auditlog.FieldTimestamp)) + + // Execute query + auditLogs, err := query.All(ctx) + if err != nil { + s.logger.Error("Failed to query audit logs", + zap.Error(err), + ) + return nil, fmt.Errorf("failed to query audit logs: %w", err) + } + + // Convert to service entries + entries := make([]*auditLogEntry, 0, len(auditLogs)) + for _, log := range auditLogs { + // Convert metadata from map[string]interface{} to map[string]string + metadata := make(map[string]string) + if log.Metadata != nil { + for k, v := range log.Metadata { + if str, ok := v.(string); ok { + metadata[k] = str + } else { + metadata[k] = fmt.Sprintf("%v", v) + } + } + } + + entry := &auditLogEntry{ + UserID: log.UserID, + Action: log.Action, + Resource: log.Resource, + ResourceID: log.ResourceID, + IPAddress: log.IPAddress, + UserAgent: log.UserAgent, + Metadata: metadata, + Timestamp: log.Timestamp.Unix(), + } + entries = append(entries, entry) + } + + return entries, nil +} + +// auditServerImpl implements the AuditService gRPC server. +type auditServerImpl struct { + auditv1.UnimplementedAuditServiceServer + service *auditService + logger *zap.Logger +} + +// Record records an audit log entry. +func (s *auditServerImpl) Record(ctx context.Context, req *auditv1.RecordRequest) (*auditv1.RecordResponse, error) { + if req.Entry == nil { + return nil, status.Error(codes.InvalidArgument, "entry is required") + } + + entry := req.Entry + + // Convert proto entry to service entry + serviceEntry := &auditLogEntry{ + UserID: entry.UserId, + Action: entry.Action, + Resource: entry.Resource, + ResourceID: entry.ResourceId, + IPAddress: entry.IpAddress, + UserAgent: entry.UserAgent, + Metadata: entry.Metadata, + Timestamp: entry.Timestamp, + } + + // Record the audit log + if err := s.service.record(ctx, serviceEntry); err != nil { + s.logger.Error("Failed to record audit log", + zap.Error(err), + zap.String("user_id", entry.UserId), + zap.String("action", entry.Action), + ) + return nil, status.Errorf(codes.Internal, "failed to record audit log: %v", err) + } + + return &auditv1.RecordResponse{ + Success: true, + }, nil +} + +// Query queries audit logs based on filters. +func (s *auditServerImpl) Query(ctx context.Context, req *auditv1.QueryRequest) (*auditv1.QueryResponse, error) { + // Convert proto filters to service filters + filters := &auditLogFilters{ + Limit: int(req.Limit), + Offset: int(req.Offset), + } + + if req.UserId != nil { + userID := *req.UserId + filters.UserID = &userID + } + if req.Action != nil { + action := *req.Action + filters.Action = &action + } + if req.Resource != nil { + resource := *req.Resource + filters.Resource = &resource + } + if req.ResourceId != nil { + resourceID := *req.ResourceId + filters.ResourceID = &resourceID + } + if req.StartTime != nil { + startTime := *req.StartTime + filters.StartTime = &startTime + } + if req.EndTime != nil { + endTime := *req.EndTime + filters.EndTime = &endTime + } + + // Query audit logs + entries, err := s.service.query(ctx, filters) + if err != nil { + s.logger.Error("Failed to query audit logs", + zap.Error(err), + ) + return nil, status.Errorf(codes.Internal, "failed to query audit logs: %v", err) + } + + // Convert service entries to proto entries + protoEntries := make([]*auditv1.AuditLogEntry, 0, len(entries)) + for _, entry := range entries { + protoEntries = append(protoEntries, &auditv1.AuditLogEntry{ + UserId: entry.UserID, + Action: entry.Action, + Resource: entry.Resource, + ResourceId: entry.ResourceID, + IpAddress: entry.IPAddress, + UserAgent: entry.UserAgent, + Metadata: entry.Metadata, + Timestamp: entry.Timestamp, + }) + } + + return &auditv1.QueryResponse{ + Entries: protoEntries, + Total: int32(len(protoEntries)), + }, nil +} + +// provideAuditService creates the audit service and gRPC server. +func provideAuditService() fx.Option { + return fx.Options( + // Audit service + fx.Provide(func(client *database.Client, log logger.Logger) (*auditService, error) { + return &auditService{ + client: client, + logger: log, + }, nil + }), + + // gRPC server implementation + fx.Provide(func(auditService *auditService, log logger.Logger) (*auditServerImpl, error) { + zapLogger, _ := zap.NewProduction() + return &auditServerImpl{ + service: auditService, + logger: zapLogger, + }, nil + }), + + // gRPC server wrapper + fx.Provide(func( + serverImpl *auditServerImpl, + cfg config.ConfigProvider, + log logger.Logger, + ) (*grpcServerWrapper, error) { + port := cfg.GetInt("services.audit.port") + if port == 0 { + port = 8084 + } + + addr := fmt.Sprintf("0.0.0.0:%d", port) + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen on %s: %w", addr, err) + } + + grpcServer := grpc.NewServer() + auditv1.RegisterAuditServiceServer(grpcServer, serverImpl) + + // Register health service + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + healthServer.SetServingStatus("audit.v1.AuditService", grpc_health_v1.HealthCheckResponse_SERVING) + + // Register reflection for grpcurl + reflection.Register(grpcServer) + + return &grpcServerWrapper{ + server: grpcServer, + listener: listener, + port: port, + logger: log, + }, nil + }), + ) +} diff --git a/cmd/audit-service/main.go b/cmd/audit-service/main.go new file mode 100644 index 0000000..aafb0a0 --- /dev/null +++ b/cmd/audit-service/main.go @@ -0,0 +1,226 @@ +// Package main provides the entry point for the Audit Service. +package main + +import ( + "context" + "fmt" + "net" + "os" + "os/signal" + "syscall" + "time" + + "git.dcentral.systems/toolz/goplt/internal/di" + healthpkg "git.dcentral.systems/toolz/goplt/internal/health" + "git.dcentral.systems/toolz/goplt/internal/infra/database" + "git.dcentral.systems/toolz/goplt/pkg/config" + "git.dcentral.systems/toolz/goplt/pkg/logger" + "git.dcentral.systems/toolz/goplt/pkg/registry" + "go.uber.org/fx" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// grpcServerWrapper wraps the gRPC server for lifecycle management. +type grpcServerWrapper struct { + server *grpc.Server + listener net.Listener + port int + logger logger.Logger +} + +func (s *grpcServerWrapper) Start() error { + s.logger.Info("Starting Audit Service gRPC server", + zap.Int("port", s.port), + zap.String("addr", s.listener.Addr().String()), + ) + + errChan := make(chan error, 1) + go func() { + if err := s.server.Serve(s.listener); err != nil { + errChan <- err + } + }() + + select { + case err := <-errChan: + return fmt.Errorf("gRPC server failed to start: %w", err) + case <-time.After(100 * time.Millisecond): + s.logger.Info("Audit Service gRPC server started successfully", + zap.Int("port", s.port), + ) + return nil + } +} + +func (s *grpcServerWrapper) Stop(ctx context.Context) error { + s.logger.Info("Stopping Audit Service gRPC server") + + stopped := make(chan struct{}) + go func() { + s.server.GracefulStop() + close(stopped) + }() + + select { + case <-stopped: + s.logger.Info("Audit Service gRPC server stopped gracefully") + return nil + case <-ctx.Done(): + s.logger.Warn("Audit Service gRPC server stop timeout, forcing stop") + s.server.Stop() + return ctx.Err() + } +} + +func (s *grpcServerWrapper) Port() int { + return s.port +} + +func main() { + // Create DI container - services will be created via fx.Provide + // Note: CoreModule() is automatically included by NewContainer() + container := di.NewContainer( + // Database for audit service (audit schema) + fx.Provide(func(cfg config.ConfigProvider, log logger.Logger) (*database.Client, error) { + dsn := cfg.GetString("database.dsn") + if dsn == "" { + return nil, fmt.Errorf("database.dsn is required") + } + client, err := database.NewClientWithSchema(dsn, "audit") + if err != nil { + return nil, err + } + + // Run migrations + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := client.Migrate(ctx); err != nil { + log.Warn("Failed to run migrations", + zap.Error(err), + ) + } else { + log.Info("Database migrations completed for audit service") + } + + return client, nil + }), + + // Health registry with database checker + fx.Provide(func(db *database.Client, log logger.Logger) (*healthpkg.Registry, error) { + registry := healthpkg.NewRegistry() + registry.Register("database", healthpkg.NewDatabaseChecker(db)) + return registry, nil + }), + + // Provide audit service and gRPC server (defined in audit_service_fx.go) + provideAuditService(), + + // Lifecycle hooks + fx.Invoke(registerLifecycle), + ) + + // Create root context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + // Start the application + if err := container.Start(ctx); err != nil { + log := logger.GetGlobalLogger() + if log != nil { + log.Error("Failed to start Audit Service", + logger.Error(err), + ) + } else { + fmt.Fprintf(os.Stderr, "Failed to start Audit Service: %v\n", err) + } + os.Exit(1) + } + + // Wait for interrupt signal + <-sigChan + fmt.Println("\nShutting down Audit Service...") + + // Create shutdown context with timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + + // Stop the application + if err := container.Stop(shutdownCtx); err != nil { + log := logger.GetGlobalLogger() + if log != nil { + log.Error("Error during Audit Service shutdown", + logger.Error(err), + ) + } else { + fmt.Fprintf(os.Stderr, "Error during shutdown: %v\n", err) + } + os.Exit(1) + } + + fmt.Println("Audit Service stopped successfully") +} + +// registerLifecycle registers lifecycle hooks for the service. +func registerLifecycle( + lc fx.Lifecycle, + grpcServer *grpcServerWrapper, + serviceRegistry registry.ServiceRegistry, + cfg config.ConfigProvider, + log logger.Logger, +) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + // Start gRPC server + if err := grpcServer.Start(); err != nil { + return fmt.Errorf("failed to start gRPC server: %w", err) + } + + // Register with service registry + serviceID := fmt.Sprintf("audit-service-%d", time.Now().Unix()) + host := cfg.GetString("services.audit.host") + if host == "" { + host = "localhost" + } + port := grpcServer.Port() + + instance := ®istry.ServiceInstance{ + ID: serviceID, + Name: "audit-service", + Address: host, + Port: port, + Tags: []string{"grpc", "audit"}, + Metadata: map[string]string{ + "version": "1.0.0", + "protocol": "grpc", + }, + } + + if err := serviceRegistry.Register(ctx, instance); err != nil { + log.Warn("Failed to register with service registry", + zap.Error(err), + ) + } else { + log.Info("Registered Audit Service with service registry", + zap.String("service_id", serviceID), + zap.String("name", instance.Name), + zap.Int("port", port), + ) + } + + return nil + }, + OnStop: func(ctx context.Context) error { + // Stop gRPC server + if err := grpcServer.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop gRPC server: %w", err) + } + return nil + }, + }) +} diff --git a/cmd/auth-service/auth_service_fx.go b/cmd/auth-service/auth_service_fx.go index ec965a1..0f167a2 100644 --- a/cmd/auth-service/auth_service_fx.go +++ b/cmd/auth-service/auth_service_fx.go @@ -12,7 +12,7 @@ import ( "time" authv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/auth/v1" - "git.dcentral.systems/toolz/goplt/internal/ent" + "git.dcentral.systems/toolz/goplt/internal/infra/database" "git.dcentral.systems/toolz/goplt/pkg/config" "git.dcentral.systems/toolz/goplt/pkg/logger" "git.dcentral.systems/toolz/goplt/pkg/services" @@ -34,7 +34,7 @@ const ( // authService provides authentication functionality. type authService struct { - client *ent.Client + client *database.Client logger logger.Logger identityClient services.IdentityServiceClient jwtSecret []byte @@ -303,7 +303,7 @@ func provideAuthService() fx.Option { return fx.Options( // Auth service fx.Provide(func( - client *ent.Client, + client *database.Client, log logger.Logger, identityClient services.IdentityServiceClient, cfg config.ConfigProvider, diff --git a/cmd/auth-service/main.go b/cmd/auth-service/main.go index d875235..3f48cd3 100644 --- a/cmd/auth-service/main.go +++ b/cmd/auth-service/main.go @@ -81,10 +81,8 @@ func (s *grpcServerWrapper) Port() int { func main() { // Create DI container + // Note: CoreModule() is automatically included by NewContainer() container := di.NewContainer( - // Core kernel services - di.CoreModule(), - // Database for auth service (auth schema) fx.Provide(func(cfg config.ConfigProvider, log logger.Logger) (*database.Client, error) { dsn := cfg.GetString("database.dsn") diff --git a/cmd/authz-service/authz_service_fx.go b/cmd/authz-service/authz_service_fx.go index 530bb57..ce9fe4c 100644 --- a/cmd/authz-service/authz_service_fx.go +++ b/cmd/authz-service/authz_service_fx.go @@ -10,6 +10,7 @@ import ( authzv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/authz/v1" "git.dcentral.systems/toolz/goplt/internal/ent" "git.dcentral.systems/toolz/goplt/internal/ent/userrole" + "git.dcentral.systems/toolz/goplt/internal/infra/database" "git.dcentral.systems/toolz/goplt/pkg/config" "git.dcentral.systems/toolz/goplt/pkg/logger" "go.uber.org/fx" @@ -24,7 +25,7 @@ import ( // authzService provides authorization functionality. type authzService struct { - client *ent.Client + client *database.Client logger logger.Logger } @@ -231,7 +232,7 @@ func (s *authzServerImpl) GetUserRoles(ctx context.Context, req *authzv1.GetUser func provideAuthzService() fx.Option { return fx.Options( // Authz service - fx.Provide(func(client *ent.Client, log logger.Logger) (*authzService, error) { + fx.Provide(func(client *database.Client, log logger.Logger) (*authzService, error) { return &authzService{ client: client, logger: log, diff --git a/cmd/authz-service/main.go b/cmd/authz-service/main.go index efb5c76..f96aa59 100644 --- a/cmd/authz-service/main.go +++ b/cmd/authz-service/main.go @@ -79,10 +79,8 @@ func (s *grpcServerWrapper) Port() int { func main() { // Create DI container + // Note: CoreModule() is automatically included by NewContainer() container := di.NewContainer( - // Core kernel services - di.CoreModule(), - // Database for authz service (authz schema) fx.Provide(func(cfg config.ConfigProvider, log logger.Logger) (*database.Client, error) { dsn := cfg.GetString("database.dsn") diff --git a/cmd/identity-service/identity_service_fx.go b/cmd/identity-service/identity_service_fx.go new file mode 100644 index 0000000..675ad20 --- /dev/null +++ b/cmd/identity-service/identity_service_fx.go @@ -0,0 +1,414 @@ +// Package main provides FX providers for Identity Service. +// This file creates the service inline to avoid importing internal packages. +package main + +import ( + "context" + "crypto/rand" + "crypto/subtle" + "encoding/base64" + "fmt" + "net" + "strings" + "time" + + identityv1 "git.dcentral.systems/toolz/goplt/api/proto/generated/identity/v1" + "git.dcentral.systems/toolz/goplt/internal/ent" + "git.dcentral.systems/toolz/goplt/internal/ent/user" + "git.dcentral.systems/toolz/goplt/internal/infra/database" + "git.dcentral.systems/toolz/goplt/pkg/config" + "git.dcentral.systems/toolz/goplt/pkg/logger" + "go.uber.org/fx" + "go.uber.org/zap" + "golang.org/x/crypto/argon2" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" +) + +// userService provides user management functionality. +type userService struct { + client *database.Client + logger logger.Logger +} + +// generateToken generates a random token. +func generateToken() (string, error) { + b := make([]byte, 32) + if _, err := rand.Read(b); err != nil { + return "", fmt.Errorf("failed to generate token: %w", err) + } + return fmt.Sprintf("%x", b), nil +} + +// hashPassword hashes a password using argon2id. +func hashPassword(password string) (string, error) { + salt := make([]byte, 16) + if _, err := rand.Read(salt); err != nil { + return "", fmt.Errorf("failed to generate salt: %w", err) + } + hash := argon2.IDKey([]byte(password), salt, 3, 64*1024, 4, 32) + b64Salt := base64.RawStdEncoding.EncodeToString(salt) + b64Hash := base64.RawStdEncoding.EncodeToString(hash) + return fmt.Sprintf("$argon2id$v=%d$m=%d,t=%d,p=%d$%s$%s", + argon2.Version, 64*1024, 3, 4, b64Salt, b64Hash), nil +} + +// verifyPassword verifies a password against a hash. +func verifyPassword(password, hash string) (bool, error) { + // Simplified verification - in production use proper parsing + parts := strings.Split(hash, "$") + if len(parts) != 6 { + return false, fmt.Errorf("invalid hash format") + } + salt, err := base64.RawStdEncoding.DecodeString(parts[4]) + if err != nil { + return false, err + } + expectedHash, err := base64.RawStdEncoding.DecodeString(parts[5]) + if err != nil { + return false, err + } + actualHash := argon2.IDKey([]byte(password), salt, 3, 64*1024, 4, uint32(len(expectedHash))) + return subtle.ConstantTimeCompare(expectedHash, actualHash) == 1, nil +} + +// createUser creates a new user. +func (s *userService) createUser(ctx context.Context, email, username, pwd, firstName, lastName string) (*ent.User, error) { + exists, err := s.client.User.Query().Where(user.Email(email)).Exist(ctx) + if err != nil { + return nil, fmt.Errorf("failed to check email: %w", err) + } + if exists { + return nil, fmt.Errorf("email already exists") + } + + passwordHash, err := hashPassword(pwd) + if err != nil { + return nil, fmt.Errorf("failed to hash password: %w", err) + } + + verificationToken, err := generateToken() + if err != nil { + return nil, fmt.Errorf("failed to generate token: %w", err) + } + + create := s.client.User.Create(). + SetID(fmt.Sprintf("%d", time.Now().UnixNano())). + SetEmail(email). + SetPasswordHash(passwordHash). + SetVerified(false). + SetEmailVerificationToken(verificationToken) + + if username != "" { + create = create.SetUsername(username) + } + if firstName != "" { + create = create.SetFirstName(firstName) + } + if lastName != "" { + create = create.SetLastName(lastName) + } + + u, err := create.Save(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create user: %w", err) + } + return u, nil +} + +// getUser retrieves a user by ID. +func (s *userService) getUser(ctx context.Context, id string) (*ent.User, error) { + u, err := s.client.User.Get(ctx, id) + if err != nil { + return nil, fmt.Errorf("failed to get user: %w", err) + } + return u, nil +} + +// getUserByEmail retrieves a user by email. +func (s *userService) getUserByEmail(ctx context.Context, email string) (*ent.User, error) { + u, err := s.client.User.Query().Where(user.Email(email)).Only(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get user: %w", err) + } + return u, nil +} + +// updateUser updates a user. +func (s *userService) updateUser(ctx context.Context, id string, email, username, firstName, lastName *string) (*ent.User, error) { + update := s.client.User.UpdateOneID(id) + if email != nil { + exists, err := s.client.User.Query(). + Where(user.Email(*email), user.IDNEQ(id)). + Exist(ctx) + if err != nil { + return nil, fmt.Errorf("failed to check email: %w", err) + } + if exists { + return nil, fmt.Errorf("email already taken") + } + update = update.SetEmail(*email) + } + if username != nil { + update = update.SetUsername(*username) + } + if firstName != nil { + update = update.SetFirstName(*firstName) + } + if lastName != nil { + update = update.SetLastName(*lastName) + } + return update.Save(ctx) +} + +// deleteUser deletes a user. +func (s *userService) deleteUser(ctx context.Context, id string) error { + return s.client.User.DeleteOneID(id).Exec(ctx) +} + +// verifyEmail verifies a user's email. +func (s *userService) verifyEmail(ctx context.Context, token string) error { + u, err := s.client.User.Query().Where(user.EmailVerificationToken(token)).Only(ctx) + if err != nil { + return fmt.Errorf("invalid token") + } + _, err = s.client.User.UpdateOneID(u.ID). + SetVerified(true). + ClearEmailVerificationToken(). + Save(ctx) + return err +} + +// requestPasswordReset requests a password reset. +func (s *userService) requestPasswordReset(ctx context.Context, email string) (string, error) { + u, err := s.getUserByEmail(ctx, email) + if err != nil { + return "", nil // Don't reveal if user exists + } + token, err := generateToken() + if err != nil { + return "", err + } + expiresAt := time.Now().Add(24 * time.Hour) + _, err = s.client.User.UpdateOneID(u.ID). + SetPasswordResetToken(token). + SetPasswordResetExpiresAt(expiresAt). + Save(ctx) + return token, err +} + +// resetPassword resets a password. +func (s *userService) resetPassword(ctx context.Context, token, newPassword string) error { + u, err := s.client.User.Query().Where(user.PasswordResetToken(token)).Only(ctx) + if err != nil { + return fmt.Errorf("invalid token") + } + if !u.PasswordResetExpiresAt.IsZero() && u.PasswordResetExpiresAt.Before(time.Now()) { + return fmt.Errorf("token expired") + } + passwordHash, err := hashPassword(newPassword) + if err != nil { + return err + } + _, err = s.client.User.UpdateOneID(u.ID). + SetPasswordHash(passwordHash). + ClearPasswordResetToken(). + ClearPasswordResetExpiresAt(). + Save(ctx) + return err +} + +// verifyPassword verifies a password. +func (s *userService) verifyPassword(ctx context.Context, email, pwd string) (*ent.User, error) { + u, err := s.getUserByEmail(ctx, email) + if err != nil { + return nil, err + } + valid, err := verifyPassword(pwd, u.PasswordHash) + if err != nil || !valid { + return nil, fmt.Errorf("invalid password") + } + return u, nil +} + +// identityServerImpl implements the IdentityService gRPC server. +type identityServerImpl struct { + identityv1.UnimplementedIdentityServiceServer + service *userService + logger *zap.Logger +} + +// GetUser retrieves a user by ID. +func (s *identityServerImpl) GetUser(ctx context.Context, req *identityv1.GetUserRequest) (*identityv1.GetUserResponse, error) { + u, err := s.service.getUser(ctx, req.Id) + if err != nil { + return nil, status.Errorf(codes.NotFound, "user not found: %v", err) + } + return &identityv1.GetUserResponse{ + User: &identityv1.User{ + Id: u.ID, + Email: u.Email, + Username: u.Username, + FirstName: u.FirstName, + LastName: u.LastName, + EmailVerified: u.Verified, + CreatedAt: u.CreatedAt.Unix(), + UpdatedAt: u.UpdatedAt.Unix(), + }, + }, nil +} + +// GetUserByEmail retrieves a user by email. +func (s *identityServerImpl) GetUserByEmail(ctx context.Context, req *identityv1.GetUserByEmailRequest) (*identityv1.GetUserByEmailResponse, error) { + u, err := s.service.getUserByEmail(ctx, req.Email) + if err != nil { + return nil, status.Errorf(codes.NotFound, "user not found: %v", err) + } + return &identityv1.GetUserByEmailResponse{ + User: &identityv1.User{ + Id: u.ID, + Email: u.Email, + Username: u.Username, + FirstName: u.FirstName, + LastName: u.LastName, + EmailVerified: u.Verified, + CreatedAt: u.CreatedAt.Unix(), + UpdatedAt: u.UpdatedAt.Unix(), + }, + }, nil +} + +// CreateUser creates a new user. +func (s *identityServerImpl) CreateUser(ctx context.Context, req *identityv1.CreateUserRequest) (*identityv1.CreateUserResponse, error) { + u, err := s.service.createUser(ctx, req.Email, req.Username, req.Password, req.FirstName, req.LastName) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create user: %v", err) + } + return &identityv1.CreateUserResponse{ + User: &identityv1.User{ + Id: u.ID, + Email: u.Email, + Username: u.Username, + FirstName: u.FirstName, + LastName: u.LastName, + EmailVerified: u.Verified, + CreatedAt: u.CreatedAt.Unix(), + UpdatedAt: u.UpdatedAt.Unix(), + }, + }, nil +} + +// UpdateUser updates a user. +func (s *identityServerImpl) UpdateUser(ctx context.Context, req *identityv1.UpdateUserRequest) (*identityv1.UpdateUserResponse, error) { + u, err := s.service.updateUser(ctx, req.Id, req.Email, req.Username, req.FirstName, req.LastName) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to update user: %v", err) + } + return &identityv1.UpdateUserResponse{ + User: &identityv1.User{ + Id: u.ID, + Email: u.Email, + Username: u.Username, + FirstName: u.FirstName, + LastName: u.LastName, + EmailVerified: u.Verified, + CreatedAt: u.CreatedAt.Unix(), + UpdatedAt: u.UpdatedAt.Unix(), + }, + }, nil +} + +// DeleteUser deletes a user. +func (s *identityServerImpl) DeleteUser(ctx context.Context, req *identityv1.DeleteUserRequest) (*identityv1.DeleteUserResponse, error) { + if err := s.service.deleteUser(ctx, req.Id); err != nil { + return nil, status.Errorf(codes.Internal, "failed to delete user: %v", err) + } + return &identityv1.DeleteUserResponse{Success: true}, nil +} + +// VerifyEmail verifies a user's email. +func (s *identityServerImpl) VerifyEmail(ctx context.Context, req *identityv1.VerifyEmailRequest) (*identityv1.VerifyEmailResponse, error) { + if err := s.service.verifyEmail(ctx, req.Token); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to verify email: %v", err) + } + return &identityv1.VerifyEmailResponse{Success: true}, nil +} + +// RequestPasswordReset requests a password reset. +func (s *identityServerImpl) RequestPasswordReset(ctx context.Context, req *identityv1.RequestPasswordResetRequest) (*identityv1.RequestPasswordResetResponse, error) { + _, err := s.service.requestPasswordReset(ctx, req.Email) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to request password reset: %v", err) + } + return &identityv1.RequestPasswordResetResponse{Success: true}, nil +} + +// ResetPassword resets a password. +func (s *identityServerImpl) ResetPassword(ctx context.Context, req *identityv1.ResetPasswordRequest) (*identityv1.ResetPasswordResponse, error) { + if err := s.service.resetPassword(ctx, req.Token, req.NewPassword); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to reset password: %v", err) + } + return &identityv1.ResetPasswordResponse{Success: true}, nil +} + +// provideIdentityService creates the identity service and gRPC server. +func provideIdentityService() fx.Option { + return fx.Options( + // User service + fx.Provide(func(client *database.Client, log logger.Logger) (*userService, error) { + return &userService{ + client: client, + logger: log, + }, nil + }), + + // gRPC server implementation + fx.Provide(func(userService *userService, log logger.Logger) (*identityServerImpl, error) { + zapLogger, _ := zap.NewProduction() + return &identityServerImpl{ + service: userService, + logger: zapLogger, + }, nil + }), + + // gRPC server wrapper + fx.Provide(func( + serverImpl *identityServerImpl, + cfg config.ConfigProvider, + log logger.Logger, + ) (*grpcServerWrapper, error) { + port := cfg.GetInt("services.identity.port") + if port == 0 { + port = 8082 + } + + addr := fmt.Sprintf("0.0.0.0:%d", port) + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen on %s: %w", addr, err) + } + + grpcServer := grpc.NewServer() + identityv1.RegisterIdentityServiceServer(grpcServer, serverImpl) + + // Register health service + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + healthServer.SetServingStatus("identity.v1.IdentityService", grpc_health_v1.HealthCheckResponse_SERVING) + + // Register reflection for grpcurl + reflection.Register(grpcServer) + + return &grpcServerWrapper{ + server: grpcServer, + listener: listener, + port: port, + logger: log, + }, nil + }), + ) +} diff --git a/cmd/identity-service/main.go b/cmd/identity-service/main.go new file mode 100644 index 0000000..81cf452 --- /dev/null +++ b/cmd/identity-service/main.go @@ -0,0 +1,226 @@ +// Package main provides the entry point for the Identity Service. +package main + +import ( + "context" + "fmt" + "net" + "os" + "os/signal" + "syscall" + "time" + + "git.dcentral.systems/toolz/goplt/internal/di" + healthpkg "git.dcentral.systems/toolz/goplt/internal/health" + "git.dcentral.systems/toolz/goplt/internal/infra/database" + "git.dcentral.systems/toolz/goplt/pkg/config" + "git.dcentral.systems/toolz/goplt/pkg/logger" + "git.dcentral.systems/toolz/goplt/pkg/registry" + "go.uber.org/fx" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// grpcServerWrapper wraps the gRPC server for lifecycle management. +type grpcServerWrapper struct { + server *grpc.Server + listener net.Listener + port int + logger logger.Logger +} + +func (s *grpcServerWrapper) Start() error { + s.logger.Info("Starting Identity Service gRPC server", + zap.Int("port", s.port), + zap.String("addr", s.listener.Addr().String()), + ) + + errChan := make(chan error, 1) + go func() { + if err := s.server.Serve(s.listener); err != nil { + errChan <- err + } + }() + + select { + case err := <-errChan: + return fmt.Errorf("gRPC server failed to start: %w", err) + case <-time.After(100 * time.Millisecond): + s.logger.Info("Identity Service gRPC server started successfully", + zap.Int("port", s.port), + ) + return nil + } +} + +func (s *grpcServerWrapper) Stop(ctx context.Context) error { + s.logger.Info("Stopping Identity Service gRPC server") + + stopped := make(chan struct{}) + go func() { + s.server.GracefulStop() + close(stopped) + }() + + select { + case <-stopped: + s.logger.Info("Identity Service gRPC server stopped gracefully") + return nil + case <-ctx.Done(): + s.logger.Warn("Identity Service gRPC server stop timeout, forcing stop") + s.server.Stop() + return ctx.Err() + } +} + +func (s *grpcServerWrapper) Port() int { + return s.port +} + +func main() { + // Create DI container + // Note: CoreModule() is automatically included by NewContainer() + container := di.NewContainer( + // Database for identity service (identity schema) + fx.Provide(func(cfg config.ConfigProvider, log logger.Logger) (*database.Client, error) { + dsn := cfg.GetString("database.dsn") + if dsn == "" { + return nil, fmt.Errorf("database.dsn is required") + } + client, err := database.NewClientWithSchema(dsn, "identity") + if err != nil { + return nil, err + } + + // Run migrations + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := client.Migrate(ctx); err != nil { + log.Warn("Failed to run migrations", + zap.Error(err), + ) + } else { + log.Info("Database migrations completed for identity service") + } + + return client, nil + }), + + // Health registry with database checker + fx.Provide(func(db *database.Client, log logger.Logger) (*healthpkg.Registry, error) { + registry := healthpkg.NewRegistry() + registry.Register("database", healthpkg.NewDatabaseChecker(db)) + return registry, nil + }), + + // Provide identity service and gRPC server (defined in identity_service_fx.go) + provideIdentityService(), + + // Lifecycle hooks + fx.Invoke(registerLifecycle), + ) + + // Create root context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + // Start the application + if err := container.Start(ctx); err != nil { + log := logger.GetGlobalLogger() + if log != nil { + log.Error("Failed to start Identity Service", + logger.Error(err), + ) + } else { + fmt.Fprintf(os.Stderr, "Failed to start Identity Service: %v\n", err) + } + os.Exit(1) + } + + // Wait for interrupt signal + <-sigChan + fmt.Println("\nShutting down Identity Service...") + + // Create shutdown context with timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + + // Stop the application + if err := container.Stop(shutdownCtx); err != nil { + log := logger.GetGlobalLogger() + if log != nil { + log.Error("Error during Identity Service shutdown", + logger.Error(err), + ) + } else { + fmt.Fprintf(os.Stderr, "Error during shutdown: %v\n", err) + } + os.Exit(1) + } + + fmt.Println("Identity Service stopped successfully") +} + +// registerLifecycle registers lifecycle hooks for the service. +func registerLifecycle( + lc fx.Lifecycle, + grpcServer *grpcServerWrapper, + serviceRegistry registry.ServiceRegistry, + cfg config.ConfigProvider, + log logger.Logger, +) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + // Start gRPC server + if err := grpcServer.Start(); err != nil { + return fmt.Errorf("failed to start gRPC server: %w", err) + } + + // Register with service registry + serviceID := fmt.Sprintf("identity-service-%d", time.Now().Unix()) + host := cfg.GetString("services.identity.host") + if host == "" { + host = "localhost" + } + port := grpcServer.Port() + + instance := ®istry.ServiceInstance{ + ID: serviceID, + Name: "identity-service", + Address: host, + Port: port, + Tags: []string{"grpc", "identity"}, + Metadata: map[string]string{ + "version": "1.0.0", + "protocol": "grpc", + }, + } + + if err := serviceRegistry.Register(ctx, instance); err != nil { + log.Warn("Failed to register with service registry", + zap.Error(err), + ) + } else { + log.Info("Registered Identity Service with service registry", + zap.String("service_id", serviceID), + zap.String("name", instance.Name), + zap.Int("port", port), + ) + } + + return nil + }, + OnStop: func(ctx context.Context) error { + // Stop gRPC server + if err := grpcServer.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop gRPC server: %w", err) + } + return nil + }, + }) +} diff --git a/docker-compose.yml b/docker-compose.yml index 42123f5..cac577d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,7 +22,7 @@ services: - goplt-network consul: - image: consul:latest + image: consul:1.15.4 container_name: goplt-consul command: consul agent -dev -client=0.0.0.0 ports: