refactor: Align Epic 0 & Epic 1 with true microservices architecture
Refactor core kernel and infrastructure to support true microservices architecture where services are independently deployable. Phase 1: Core Kernel Cleanup - Remove database provider from CoreModule (services create their own) - Update ProvideHealthRegistry to not depend on database - Add schema support to database client (NewClientWithSchema) - Update main entry point to remove database dependency - Core kernel now provides only: config, logger, error bus, health, metrics, tracer, service registry Phase 2: Service Registry Implementation - Create ServiceRegistry interface (pkg/registry/registry.go) - Implement Consul registry (internal/registry/consul/consul.go) - Add Consul dependency (github.com/hashicorp/consul/api) - Add registry configuration to config/default.yaml - Add ProvideServiceRegistry() to DI container Phase 3: Service Client Interfaces - Create service client interfaces: - pkg/services/auth.go - AuthServiceClient - pkg/services/identity.go - IdentityServiceClient - pkg/services/authz.go - AuthzServiceClient - pkg/services/audit.go - AuditServiceClient - Create ServiceClientFactory (internal/client/factory.go) - Create stub gRPC client implementations (internal/client/grpc/) - Add ProvideServiceClientFactory() to DI container Phase 4: gRPC Service Definitions - Create proto files for all core services: - api/proto/auth.proto - api/proto/identity.proto - api/proto/authz.proto - api/proto/audit.proto - Add generate-proto target to Makefile Phase 5: API Gateway Implementation - Create API Gateway service entry point (cmd/api-gateway/main.go) - Create Gateway implementation (services/gateway/gateway.go) - Add gateway configuration to config/default.yaml - Gateway registers with Consul and routes requests to backend services All code compiles successfully. Core services (Auth, Identity, Authz, Audit) will be implemented in Epic 2 using these foundations.
This commit is contained in:
51
internal/client/factory.go
Normal file
51
internal/client/factory.go
Normal file
@@ -0,0 +1,51 @@
|
||||
// Package client provides service client factory for creating service clients.
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/internal/client/grpc"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/services"
|
||||
)
|
||||
|
||||
// ServiceClientFactory creates service clients for inter-service communication.
|
||||
type ServiceClientFactory struct {
|
||||
registry registry.ServiceRegistry
|
||||
}
|
||||
|
||||
// NewServiceClientFactory creates a new service client factory.
|
||||
func NewServiceClientFactory(reg registry.ServiceRegistry) *ServiceClientFactory {
|
||||
return &ServiceClientFactory{
|
||||
registry: reg,
|
||||
}
|
||||
}
|
||||
|
||||
// GetAuthClient returns an AuthServiceClient.
|
||||
func (f *ServiceClientFactory) GetAuthClient() (services.AuthServiceClient, error) {
|
||||
return grpc.NewAuthClient(f.registry)
|
||||
}
|
||||
|
||||
// GetIdentityClient returns an IdentityServiceClient.
|
||||
func (f *ServiceClientFactory) GetIdentityClient() (services.IdentityServiceClient, error) {
|
||||
return grpc.NewIdentityClient(f.registry)
|
||||
}
|
||||
|
||||
// GetAuthzClient returns an AuthzServiceClient.
|
||||
func (f *ServiceClientFactory) GetAuthzClient() (services.AuthzServiceClient, error) {
|
||||
return grpc.NewAuthzClient(f.registry)
|
||||
}
|
||||
|
||||
// GetAuditClient returns an AuditServiceClient.
|
||||
func (f *ServiceClientFactory) GetAuditClient() (services.AuditServiceClient, error) {
|
||||
return grpc.NewAuditClient(f.registry)
|
||||
}
|
||||
|
||||
// DiscoverService discovers service instances for a given service name.
|
||||
func (f *ServiceClientFactory) DiscoverService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
|
||||
if f.registry == nil {
|
||||
return nil, fmt.Errorf("service registry is not available")
|
||||
}
|
||||
return f.registry.Discover(ctx, serviceName)
|
||||
}
|
||||
33
internal/client/grpc/audit_client.go
Normal file
33
internal/client/grpc/audit_client.go
Normal file
@@ -0,0 +1,33 @@
|
||||
// Package grpc provides gRPC client implementations for service clients.
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/services"
|
||||
)
|
||||
|
||||
// AuditClient implements AuditServiceClient using gRPC.
|
||||
// This is a stub implementation - will be fully implemented when proto files are generated in Phase 4.
|
||||
type AuditClient struct {
|
||||
registry registry.ServiceRegistry
|
||||
}
|
||||
|
||||
// NewAuditClient creates a new gRPC client for the Audit Service.
|
||||
func NewAuditClient(reg registry.ServiceRegistry) (services.AuditServiceClient, error) {
|
||||
return &AuditClient{
|
||||
registry: reg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Record records an audit log entry.
|
||||
func (c *AuditClient) Record(ctx context.Context, entry *services.AuditLogEntry) error {
|
||||
return fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// Query queries audit logs based on filters.
|
||||
func (c *AuditClient) Query(ctx context.Context, filters *services.AuditLogFilters) ([]services.AuditLogEntry, error) {
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
75
internal/client/grpc/auth_client.go
Normal file
75
internal/client/grpc/auth_client.go
Normal file
@@ -0,0 +1,75 @@
|
||||
// Package grpc provides gRPC client implementations for service clients.
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/services"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
// AuthClient implements AuthServiceClient using gRPC.
|
||||
// This is a stub implementation - will be fully implemented when proto files are generated in Phase 4.
|
||||
type AuthClient struct {
|
||||
registry registry.ServiceRegistry
|
||||
// conn will be set when proto files are available
|
||||
// conn *grpc.ClientConn
|
||||
}
|
||||
|
||||
// NewAuthClient creates a new gRPC client for the Auth Service.
|
||||
func NewAuthClient(reg registry.ServiceRegistry) (services.AuthServiceClient, error) {
|
||||
return &AuthClient{
|
||||
registry: reg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Login authenticates a user and returns access and refresh tokens.
|
||||
func (c *AuthClient) Login(ctx context.Context, email, password string) (*services.TokenResponse, error) {
|
||||
// TODO: Implement when proto files are generated
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// RefreshToken refreshes an access token using a refresh token.
|
||||
func (c *AuthClient) RefreshToken(ctx context.Context, refreshToken string) (*services.TokenResponse, error) {
|
||||
// TODO: Implement when proto files are generated
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// ValidateToken validates a JWT token and returns the token claims.
|
||||
func (c *AuthClient) ValidateToken(ctx context.Context, token string) (*services.TokenClaims, error) {
|
||||
// TODO: Implement when proto files are generated
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// Logout invalidates a refresh token.
|
||||
func (c *AuthClient) Logout(ctx context.Context, refreshToken string) error {
|
||||
// TODO: Implement when proto files are generated
|
||||
return fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// connectToService discovers and connects to a service instance.
|
||||
func connectToService(ctx context.Context, reg registry.ServiceRegistry, serviceName string) (*grpc.ClientConn, error) {
|
||||
instances, err := reg.Discover(ctx, serviceName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to discover service %s: %w", serviceName, err)
|
||||
}
|
||||
|
||||
if len(instances) == 0 {
|
||||
return nil, fmt.Errorf("no instances found for service %s", serviceName)
|
||||
}
|
||||
|
||||
// Use the first healthy instance (load balancing can be added later)
|
||||
instance := instances[0]
|
||||
address := fmt.Sprintf("%s:%d", instance.Address, instance.Port)
|
||||
|
||||
// Create gRPC connection
|
||||
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to %s at %s: %w", serviceName, address, err)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
43
internal/client/grpc/authz_client.go
Normal file
43
internal/client/grpc/authz_client.go
Normal file
@@ -0,0 +1,43 @@
|
||||
// Package grpc provides gRPC client implementations for service clients.
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/services"
|
||||
)
|
||||
|
||||
// AuthzClient implements AuthzServiceClient using gRPC.
|
||||
// This is a stub implementation - will be fully implemented when proto files are generated in Phase 4.
|
||||
type AuthzClient struct {
|
||||
registry registry.ServiceRegistry
|
||||
}
|
||||
|
||||
// NewAuthzClient creates a new gRPC client for the Authz Service.
|
||||
func NewAuthzClient(reg registry.ServiceRegistry) (services.AuthzServiceClient, error) {
|
||||
return &AuthzClient{
|
||||
registry: reg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Authorize checks if a user has a specific permission and returns an error if not.
|
||||
func (c *AuthzClient) Authorize(ctx context.Context, userID, permission string) error {
|
||||
return fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// HasPermission checks if a user has a specific permission.
|
||||
func (c *AuthzClient) HasPermission(ctx context.Context, userID, permission string) (bool, error) {
|
||||
return false, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// GetUserPermissions returns all permissions for a user.
|
||||
func (c *AuthzClient) GetUserPermissions(ctx context.Context, userID string) ([]services.Permission, error) {
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// GetUserRoles returns all roles for a user.
|
||||
func (c *AuthzClient) GetUserRoles(ctx context.Context, userID string) ([]services.Role, error) {
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
63
internal/client/grpc/identity_client.go
Normal file
63
internal/client/grpc/identity_client.go
Normal file
@@ -0,0 +1,63 @@
|
||||
// Package grpc provides gRPC client implementations for service clients.
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/services"
|
||||
)
|
||||
|
||||
// IdentityClient implements IdentityServiceClient using gRPC.
|
||||
// This is a stub implementation - will be fully implemented when proto files are generated in Phase 4.
|
||||
type IdentityClient struct {
|
||||
registry registry.ServiceRegistry
|
||||
}
|
||||
|
||||
// NewIdentityClient creates a new gRPC client for the Identity Service.
|
||||
func NewIdentityClient(reg registry.ServiceRegistry) (services.IdentityServiceClient, error) {
|
||||
return &IdentityClient{
|
||||
registry: reg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetUser retrieves a user by ID.
|
||||
func (c *IdentityClient) GetUser(ctx context.Context, id string) (*services.User, error) {
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// GetUserByEmail retrieves a user by email address.
|
||||
func (c *IdentityClient) GetUserByEmail(ctx context.Context, email string) (*services.User, error) {
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// CreateUser creates a new user.
|
||||
func (c *IdentityClient) CreateUser(ctx context.Context, user *services.CreateUserRequest) (*services.User, error) {
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// UpdateUser updates an existing user.
|
||||
func (c *IdentityClient) UpdateUser(ctx context.Context, id string, user *services.UpdateUserRequest) (*services.User, error) {
|
||||
return nil, fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// DeleteUser deletes a user.
|
||||
func (c *IdentityClient) DeleteUser(ctx context.Context, id string) error {
|
||||
return fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// VerifyEmail verifies a user's email address using a verification token.
|
||||
func (c *IdentityClient) VerifyEmail(ctx context.Context, token string) error {
|
||||
return fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// RequestPasswordReset requests a password reset token.
|
||||
func (c *IdentityClient) RequestPasswordReset(ctx context.Context, email string) error {
|
||||
return fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
|
||||
// ResetPassword resets a user's password using a reset token.
|
||||
func (c *IdentityClient) ResetPassword(ctx context.Context, token, newPassword string) error {
|
||||
return fmt.Errorf("not implemented: proto files not yet generated")
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/internal/client"
|
||||
configimpl "git.dcentral.systems/toolz/goplt/internal/config"
|
||||
errorbusimpl "git.dcentral.systems/toolz/goplt/internal/errorbus"
|
||||
"git.dcentral.systems/toolz/goplt/internal/health"
|
||||
@@ -14,10 +15,12 @@ import (
|
||||
loggerimpl "git.dcentral.systems/toolz/goplt/internal/logger"
|
||||
"git.dcentral.systems/toolz/goplt/internal/metrics"
|
||||
"git.dcentral.systems/toolz/goplt/internal/observability"
|
||||
"git.dcentral.systems/toolz/goplt/internal/registry/consul"
|
||||
"git.dcentral.systems/toolz/goplt/internal/server"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/config"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/errorbus"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/logger"
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.opentelemetry.io/otel/trace/noop"
|
||||
"go.uber.org/fx"
|
||||
@@ -158,13 +161,11 @@ func ProvideErrorBus() fx.Option {
|
||||
}
|
||||
|
||||
// ProvideHealthRegistry creates an FX option that provides the health check registry.
|
||||
// Note: Database health checkers are registered by services that create their own database clients.
|
||||
func ProvideHealthRegistry() fx.Option {
|
||||
return fx.Provide(func(dbClient *database.Client) (*health.Registry, error) {
|
||||
return fx.Provide(func() (*health.Registry, error) {
|
||||
registry := health.NewRegistry()
|
||||
|
||||
// Register database health checker
|
||||
registry.Register("database", health.NewDatabaseChecker(dbClient))
|
||||
|
||||
// Services will register their own health checkers (e.g., database, external dependencies)
|
||||
return registry, nil
|
||||
})
|
||||
}
|
||||
@@ -176,6 +177,72 @@ func ProvideMetrics() fx.Option {
|
||||
})
|
||||
}
|
||||
|
||||
// ProvideServiceRegistry creates an FX option that provides the service registry.
|
||||
func ProvideServiceRegistry() fx.Option {
|
||||
return fx.Provide(func(cfg config.ConfigProvider) (registry.ServiceRegistry, error) {
|
||||
registryType := cfg.GetString("registry.type")
|
||||
if registryType == "" {
|
||||
registryType = "consul"
|
||||
}
|
||||
|
||||
switch registryType {
|
||||
case "consul":
|
||||
consulCfg := consul.Config{
|
||||
Address: cfg.GetString("registry.consul.address"),
|
||||
Datacenter: cfg.GetString("registry.consul.datacenter"),
|
||||
Scheme: cfg.GetString("registry.consul.scheme"),
|
||||
}
|
||||
|
||||
// Set defaults
|
||||
if consulCfg.Address == "" {
|
||||
consulCfg.Address = "localhost:8500"
|
||||
}
|
||||
if consulCfg.Datacenter == "" {
|
||||
consulCfg.Datacenter = "dc1"
|
||||
}
|
||||
if consulCfg.Scheme == "" {
|
||||
consulCfg.Scheme = "http"
|
||||
}
|
||||
|
||||
// Parse health check configuration
|
||||
healthCheckInterval := cfg.GetDuration("registry.consul.health_check.interval")
|
||||
if healthCheckInterval == 0 {
|
||||
healthCheckInterval = 10 * time.Second
|
||||
}
|
||||
healthCheckTimeout := cfg.GetDuration("registry.consul.health_check.timeout")
|
||||
if healthCheckTimeout == 0 {
|
||||
healthCheckTimeout = 3 * time.Second
|
||||
}
|
||||
healthCheckDeregisterAfter := cfg.GetDuration("registry.consul.health_check.deregister_after")
|
||||
if healthCheckDeregisterAfter == 0 {
|
||||
healthCheckDeregisterAfter = 30 * time.Second
|
||||
}
|
||||
healthCheckHTTP := cfg.GetString("registry.consul.health_check.http")
|
||||
if healthCheckHTTP == "" {
|
||||
healthCheckHTTP = "/healthz"
|
||||
}
|
||||
|
||||
consulCfg.HealthCheck = consul.HealthCheckConfig{
|
||||
Interval: healthCheckInterval,
|
||||
Timeout: healthCheckTimeout,
|
||||
DeregisterAfter: healthCheckDeregisterAfter,
|
||||
HTTP: healthCheckHTTP,
|
||||
}
|
||||
|
||||
return consul.NewRegistry(consulCfg)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported registry type: %s", registryType)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ProvideServiceClientFactory creates an FX option that provides the service client factory.
|
||||
func ProvideServiceClientFactory() fx.Option {
|
||||
return fx.Provide(func(reg registry.ServiceRegistry) (*client.ServiceClientFactory, error) {
|
||||
return client.NewServiceClientFactory(reg), nil
|
||||
})
|
||||
}
|
||||
|
||||
// ProvideTracer creates an FX option that provides the OpenTelemetry tracer.
|
||||
func ProvideTracer() fx.Option {
|
||||
return fx.Provide(func(cfg config.ConfigProvider, lc fx.Lifecycle) (trace.TracerProvider, error) {
|
||||
@@ -318,18 +385,21 @@ func ProvideHTTPServer() fx.Option {
|
||||
})
|
||||
}
|
||||
|
||||
// CoreModule returns an FX option that provides all core services.
|
||||
// This includes configuration, logging, database, error bus, health checks, metrics, tracing, and HTTP server.
|
||||
// CoreModule returns an FX option that provides all core kernel infrastructure services.
|
||||
// This includes configuration, logging, error bus, health checks, metrics, tracing, service registry, and service client factory.
|
||||
// Note: Database and HTTP server are NOT included - services will create their own instances.
|
||||
// HTTP server foundation is available via server.NewServer() for services to use.
|
||||
func CoreModule() fx.Option {
|
||||
return fx.Options(
|
||||
ProvideConfig(),
|
||||
ProvideLogger(),
|
||||
ProvideDatabase(),
|
||||
ProvideErrorBus(),
|
||||
ProvideHealthRegistry(),
|
||||
ProvideMetrics(),
|
||||
ProvideTracer(),
|
||||
ProvideHTTPServer(),
|
||||
ProvideServiceRegistry(),
|
||||
ProvideServiceClientFactory(),
|
||||
// Note: ProvideDatabase() and ProvideHTTPServer() are removed - services create their own
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -22,13 +22,15 @@ type Client struct {
|
||||
// Config holds database configuration.
|
||||
type Config struct {
|
||||
DSN string
|
||||
Schema string // Schema name for schema isolation (e.g., "identity", "auth", "authz", "audit")
|
||||
MaxConnections int
|
||||
MaxIdleConns int
|
||||
ConnMaxLifetime time.Duration
|
||||
ConnMaxIdleTime time.Duration
|
||||
}
|
||||
|
||||
// NewClient creates a new Ent client with connection pooling.
|
||||
// NewClient creates a new Ent client with connection pooling and schema isolation support.
|
||||
// If schema is provided, it will be created if it doesn't exist and set as the search path.
|
||||
func NewClient(cfg Config) (*Client, error) {
|
||||
// Open database connection
|
||||
db, err := sql.Open("postgres", cfg.DSN)
|
||||
@@ -51,6 +53,19 @@ func NewClient(cfg Config) (*Client, error) {
|
||||
return nil, fmt.Errorf("failed to ping database: %w", err)
|
||||
}
|
||||
|
||||
// Create schema if provided
|
||||
if cfg.Schema != "" {
|
||||
if err := createSchemaIfNotExists(ctx, db, cfg.Schema); err != nil {
|
||||
_ = db.Close()
|
||||
return nil, fmt.Errorf("failed to create schema %s: %w", cfg.Schema, err)
|
||||
}
|
||||
// Set search path to the schema
|
||||
if _, err := db.ExecContext(ctx, fmt.Sprintf("SET search_path TO %s", cfg.Schema)); err != nil {
|
||||
_ = db.Close()
|
||||
return nil, fmt.Errorf("failed to set search path to schema %s: %w", cfg.Schema, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create Ent driver
|
||||
drv := entsql.OpenDB(dialect.Postgres, db)
|
||||
|
||||
@@ -63,6 +78,49 @@ func NewClient(cfg Config) (*Client, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewClientWithSchema is a convenience function that creates a client with a specific schema.
|
||||
func NewClientWithSchema(dsn string, schema string) (*Client, error) {
|
||||
return NewClient(Config{
|
||||
DSN: dsn,
|
||||
Schema: schema,
|
||||
MaxConnections: 25,
|
||||
MaxIdleConns: 5,
|
||||
ConnMaxLifetime: 5 * time.Minute,
|
||||
ConnMaxIdleTime: 10 * time.Minute,
|
||||
})
|
||||
}
|
||||
|
||||
// createSchemaIfNotExists creates a PostgreSQL schema if it doesn't exist.
|
||||
func createSchemaIfNotExists(ctx context.Context, db *sql.DB, schemaName string) error {
|
||||
// Use a transaction to ensure atomicity
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Check if schema exists
|
||||
var exists bool
|
||||
err = tx.QueryRowContext(ctx,
|
||||
"SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = $1)",
|
||||
schemaName,
|
||||
).Scan(&exists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create schema if it doesn't exist
|
||||
if !exists {
|
||||
// Use fmt.Sprintf for schema name since it's a configuration value, not user input
|
||||
_, err = tx.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", schemaName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// Close closes the database connection.
|
||||
func (c *Client) Close() error {
|
||||
if err := c.Client.Close(); err != nil {
|
||||
|
||||
198
internal/registry/consul/consul.go
Normal file
198
internal/registry/consul/consul.go
Normal file
@@ -0,0 +1,198 @@
|
||||
// Package consul provides Consul-based service registry implementation.
|
||||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.dcentral.systems/toolz/goplt/pkg/registry"
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
// ConsulRegistry implements ServiceRegistry using Consul.
|
||||
type ConsulRegistry struct {
|
||||
client *consulapi.Client
|
||||
config *Config
|
||||
}
|
||||
|
||||
// Config holds Consul registry configuration.
|
||||
type Config struct {
|
||||
Address string // Consul agent address (e.g., "localhost:8500")
|
||||
Datacenter string // Consul datacenter
|
||||
Scheme string // "http" or "https"
|
||||
HealthCheck HealthCheckConfig
|
||||
}
|
||||
|
||||
// HealthCheckConfig holds health check configuration.
|
||||
type HealthCheckConfig struct {
|
||||
Interval time.Duration // Health check interval
|
||||
Timeout time.Duration // Health check timeout
|
||||
DeregisterAfter time.Duration // Time to wait before deregistering unhealthy service
|
||||
HTTP string // HTTP health check endpoint (e.g., "/healthz")
|
||||
}
|
||||
|
||||
// NewRegistry creates a new Consul-based service registry.
|
||||
func NewRegistry(cfg Config) (*ConsulRegistry, error) {
|
||||
consulConfig := consulapi.DefaultConfig()
|
||||
if cfg.Address != "" {
|
||||
consulConfig.Address = cfg.Address
|
||||
}
|
||||
if cfg.Datacenter != "" {
|
||||
consulConfig.Datacenter = cfg.Datacenter
|
||||
}
|
||||
if cfg.Scheme != "" {
|
||||
consulConfig.Scheme = cfg.Scheme
|
||||
}
|
||||
|
||||
client, err := consulapi.NewClient(consulConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create Consul client: %w", err)
|
||||
}
|
||||
|
||||
return &ConsulRegistry{
|
||||
client: client,
|
||||
config: &cfg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Register registers a service instance with Consul.
|
||||
func (r *ConsulRegistry) Register(ctx context.Context, service *registry.ServiceInstance) error {
|
||||
registration := &consulapi.AgentServiceRegistration{
|
||||
ID: service.ID,
|
||||
Name: service.Name,
|
||||
Address: service.Address,
|
||||
Port: service.Port,
|
||||
Tags: service.Tags,
|
||||
Meta: service.Metadata,
|
||||
}
|
||||
|
||||
// Add health check if configured
|
||||
if r.config.HealthCheck.HTTP != "" {
|
||||
healthCheckURL := fmt.Sprintf("http://%s:%d%s", service.Address, service.Port, r.config.HealthCheck.HTTP)
|
||||
registration.Check = &consulapi.AgentServiceCheck{
|
||||
HTTP: healthCheckURL,
|
||||
Interval: r.config.HealthCheck.Interval.String(),
|
||||
Timeout: r.config.HealthCheck.Timeout.String(),
|
||||
DeregisterCriticalServiceAfter: r.config.HealthCheck.DeregisterAfter.String(),
|
||||
}
|
||||
}
|
||||
|
||||
return r.client.Agent().ServiceRegister(registration)
|
||||
}
|
||||
|
||||
// Deregister removes a service instance from Consul.
|
||||
func (r *ConsulRegistry) Deregister(ctx context.Context, serviceID string) error {
|
||||
return r.client.Agent().ServiceDeregister(serviceID)
|
||||
}
|
||||
|
||||
// Discover returns all healthy instances of a service.
|
||||
func (r *ConsulRegistry) Discover(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
|
||||
services, _, err := r.client.Health().Service(serviceName, "", true, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to discover service %s: %w", serviceName, err)
|
||||
}
|
||||
|
||||
instances := make([]*registry.ServiceInstance, 0, len(services))
|
||||
for _, service := range services {
|
||||
instances = append(instances, ®istry.ServiceInstance{
|
||||
ID: service.Service.ID,
|
||||
Name: service.Service.Service,
|
||||
Address: service.Service.Address,
|
||||
Port: service.Service.Port,
|
||||
Tags: service.Service.Tags,
|
||||
Metadata: service.Service.Meta,
|
||||
})
|
||||
}
|
||||
|
||||
return instances, nil
|
||||
}
|
||||
|
||||
// Watch returns a channel that receives updates when service instances change.
|
||||
func (r *ConsulRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*registry.ServiceInstance, error) {
|
||||
updates := make(chan []*registry.ServiceInstance, 10)
|
||||
|
||||
go func() {
|
||||
defer close(updates)
|
||||
|
||||
lastIndex := uint64(0)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
services, meta, err := r.client.Health().Service(serviceName, "", true, &consulapi.QueryOptions{
|
||||
WaitIndex: lastIndex,
|
||||
WaitTime: 10 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
// Log error and continue
|
||||
continue
|
||||
}
|
||||
|
||||
if meta.LastIndex != lastIndex {
|
||||
instances := make([]*registry.ServiceInstance, 0, len(services))
|
||||
for _, service := range services {
|
||||
instances = append(instances, ®istry.ServiceInstance{
|
||||
ID: service.Service.ID,
|
||||
Name: service.Service.Service,
|
||||
Address: service.Service.Address,
|
||||
Port: service.Service.Port,
|
||||
Tags: service.Service.Tags,
|
||||
Metadata: service.Service.Meta,
|
||||
})
|
||||
}
|
||||
|
||||
select {
|
||||
case updates <- instances:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
lastIndex = meta.LastIndex
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
// Health returns the health status of a service instance.
|
||||
func (r *ConsulRegistry) Health(ctx context.Context, serviceID string) (*registry.HealthStatus, error) {
|
||||
entries, _, err := r.client.Health().Service(serviceID, "", false, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get health for service %s: %w", serviceID, err)
|
||||
}
|
||||
|
||||
if len(entries) == 0 {
|
||||
return ®istry.HealthStatus{
|
||||
ServiceID: serviceID,
|
||||
Status: "unknown",
|
||||
Message: "service not found",
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Check health status from service entry checks
|
||||
status := "healthy"
|
||||
message := "all checks passing"
|
||||
|
||||
// Get the first entry (should be the service instance)
|
||||
entry := entries[0]
|
||||
for _, check := range entry.Checks {
|
||||
if check.Status == consulapi.HealthCritical {
|
||||
status = "critical"
|
||||
message = check.Output
|
||||
break
|
||||
} else if check.Status == consulapi.HealthWarning {
|
||||
status = "unhealthy"
|
||||
message = check.Output
|
||||
}
|
||||
}
|
||||
|
||||
return ®istry.HealthStatus{
|
||||
ServiceID: serviceID,
|
||||
Status: status,
|
||||
Message: message,
|
||||
}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user