Compare commits
2 Commits
feature/mo
...
c4b1a2d853
| Author | SHA1 | Date | |
|---|---|---|---|
| c4b1a2d853 | |||
| 8aa8b908e6 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,2 +1 @@
|
||||
spore-gateway
|
||||
mock-gateway
|
||||
|
||||
@@ -1,425 +0,0 @@
|
||||
# Mock Gateway Implementation
|
||||
|
||||
This document describes the mock gateway implementation for the SPORE Gateway project.
|
||||
|
||||
## Overview
|
||||
|
||||
The mock gateway is a fully-featured simulation of the real SPORE Gateway that implements all functionality (WebSocket, REST API) without requiring actual SPORE nodes. It reuses all types defined for the real gateway, ensuring complete API compatibility.
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
spore-gateway/
|
||||
├── cmd/
|
||||
│ └── mock-gateway/
|
||||
│ ├── main.go # Mock gateway entry point
|
||||
│ └── README.md # Detailed usage documentation
|
||||
├── internal/
|
||||
│ ├── mock/
|
||||
│ │ ├── discovery.go # Mock node discovery implementation
|
||||
│ │ ├── server.go # Mock HTTP server with all endpoints
|
||||
│ │ ├── websocket.go # Mock WebSocket server
|
||||
│ │ └── data.go # Mock data generators
|
||||
│ ├── discovery/ # (Reused types)
|
||||
│ └── websocket/ # (Reused for upgrader)
|
||||
├── pkg/
|
||||
│ ├── client/ # (Reused types)
|
||||
│ ├── config/ # (Reused types)
|
||||
│ └── registry/ # (Reused types)
|
||||
└── scripts/
|
||||
└── run-mock-gateway.sh # Convenient runner script
|
||||
```
|
||||
|
||||
## Key Features
|
||||
|
||||
### 1. Complete REST API Implementation
|
||||
|
||||
All endpoints from the real gateway are implemented with realistic mock data:
|
||||
|
||||
**Discovery Endpoints**:
|
||||
- `GET /api/discovery/nodes` - Returns all mock nodes
|
||||
- `POST /api/discovery/refresh` - Mock refresh operation
|
||||
- `POST /api/discovery/random-primary` - Selects random primary
|
||||
- `POST /api/discovery/primary/{ip}` - Sets specific primary
|
||||
|
||||
**Cluster Endpoints**:
|
||||
- `GET /api/cluster/members` - Returns mock cluster members
|
||||
- `POST /api/cluster/refresh` - Mock cluster refresh
|
||||
- `GET /api/cluster/node/versions` - Returns node versions
|
||||
- `POST /api/rollout` - Simulates firmware rollout with progress
|
||||
|
||||
**Node Endpoints**:
|
||||
- `GET /api/node/status` - Mock system status
|
||||
- `GET /api/node/status/{ip}` - Specific node status
|
||||
- `GET /api/node/endpoints` - Mock API capabilities
|
||||
- `POST /api/node/update` - Simulated firmware upload
|
||||
|
||||
**Task Endpoints**:
|
||||
- `GET /api/tasks/status` - Mock task scheduler status
|
||||
|
||||
**Monitoring Endpoints**:
|
||||
- `GET /api/monitoring/resources` - Real-time resource monitoring for all nodes
|
||||
|
||||
**Proxy Endpoints**:
|
||||
- `POST /api/proxy-call` - Mock proxied calls
|
||||
|
||||
**Registry Endpoints**:
|
||||
- `GET /api/registry/health` - Mock registry health
|
||||
- `GET /api/registry/firmware` - Mock firmware list
|
||||
- `POST /api/registry/firmware` - Mock firmware upload
|
||||
- `GET /api/registry/firmware/{name}/{version}` - Mock firmware download
|
||||
- `PUT /api/registry/firmware/{name}/{version}` - Mock metadata update
|
||||
- `DELETE /api/registry/firmware/{name}/{version}` - Mock deletion
|
||||
|
||||
**WebSocket**:
|
||||
- `GET /ws` - WebSocket endpoint for real-time updates
|
||||
|
||||
**Health**:
|
||||
- `GET /api/health` - Gateway health check
|
||||
|
||||
### 2. WebSocket Implementation
|
||||
|
||||
The mock gateway implements full WebSocket functionality with real-time broadcasts:
|
||||
|
||||
**Broadcast Types**:
|
||||
- **Cluster Updates**: Periodic and event-driven cluster state updates
|
||||
- **Node Discovery**: Node join/leave/update events
|
||||
- **Firmware Upload Status**: Upload progress notifications
|
||||
- **Rollout Progress**: Step-by-step rollout progress with percentage
|
||||
|
||||
**Message Format**: Identical to real gateway for compatibility
|
||||
|
||||
### 3. Mock Node Discovery
|
||||
|
||||
Simulates node discovery without UDP:
|
||||
- Configurable number of nodes (default: 5)
|
||||
- Realistic node data (IP, hostname, labels, metrics)
|
||||
- Automatic heartbeat simulation
|
||||
- Primary node selection
|
||||
- Node status management (active/inactive)
|
||||
|
||||
**Generated Nodes**:
|
||||
- IPs: `192.168.1.100`, `192.168.1.101`, etc.
|
||||
- Hostnames: `spore-node-1`, `spore-node-2`, etc.
|
||||
- Labels: version (matches firmware registry), stable, env, zone, type
|
||||
- Resources: freeHeap, cpuFreqMHz, flashChipSize
|
||||
- Latency: Random realistic values
|
||||
- Firmware Distribution: 40% on v1.0.0, 40% on v1.1.0, 20% on v1.2.0 (beta)
|
||||
|
||||
### 4. Type Reuse
|
||||
|
||||
The mock gateway reuses all types from the real gateway:
|
||||
|
||||
**From `internal/discovery`**:
|
||||
- `NodeInfo` - Node information structure
|
||||
- `ClusterStatus` - Cluster state
|
||||
- `NodeStatus` - Node status enum
|
||||
- `NodeUpdateCallback` - Callback function type
|
||||
|
||||
**From `pkg/client`**:
|
||||
- `ClusterMember` - Cluster member info
|
||||
- `ClusterStatusResponse` - Cluster status response
|
||||
- `TaskStatusResponse` - Task status
|
||||
- `SystemStatusResponse` - System status
|
||||
- `CapabilitiesResponse` - API capabilities
|
||||
- `EndpointInfo` - Endpoint information
|
||||
- `ParameterInfo` - Parameter details
|
||||
- `FirmwareUpdateResponse` - Firmware update result
|
||||
|
||||
**From `pkg/registry`**:
|
||||
- `FirmwareRecord` - Firmware metadata
|
||||
- `GroupedFirmware` - Grouped firmware list
|
||||
- `FirmwareMetadata` - Firmware metadata for uploads
|
||||
|
||||
**From `pkg/config`**:
|
||||
- `Config` - Configuration structure
|
||||
|
||||
This ensures complete API compatibility and type safety.
|
||||
|
||||
## Usage
|
||||
|
||||
### Quick Start
|
||||
|
||||
```bash
|
||||
# Build and run
|
||||
go build -o mock-gateway cmd/mock-gateway/main.go
|
||||
./mock-gateway
|
||||
|
||||
# Or use the convenience script
|
||||
./scripts/run-mock-gateway.sh
|
||||
```
|
||||
|
||||
### Configuration Options
|
||||
|
||||
```bash
|
||||
./mock-gateway [options]
|
||||
|
||||
-port string
|
||||
HTTP server port (default "3001")
|
||||
-mock-nodes int
|
||||
Number of mock nodes to simulate (default 5)
|
||||
-heartbeat-rate int
|
||||
Heartbeat interval in seconds (default 5)
|
||||
-log-level string
|
||||
Log level: debug, info, warn, error (default "info")
|
||||
-enable-ws bool
|
||||
Enable WebSocket broadcasts (default true)
|
||||
-config string
|
||||
Path to configuration file
|
||||
```
|
||||
|
||||
### Example Usage
|
||||
|
||||
```bash
|
||||
# Development with debug logging
|
||||
./mock-gateway -log-level debug -mock-nodes 3
|
||||
|
||||
# Production-like setup
|
||||
./mock-gateway -port 3001 -mock-nodes 10 -heartbeat-rate 5
|
||||
|
||||
# Testing without WebSocket
|
||||
./mock-gateway -enable-ws=false
|
||||
|
||||
# Large cluster simulation
|
||||
./mock-gateway -mock-nodes 20 -heartbeat-rate 2
|
||||
```
|
||||
|
||||
### Using the Convenience Script
|
||||
|
||||
```bash
|
||||
# Basic usage
|
||||
./scripts/run-mock-gateway.sh
|
||||
|
||||
# With options
|
||||
./scripts/run-mock-gateway.sh -p 8080 -n 10 -l debug
|
||||
|
||||
# Show help
|
||||
./scripts/run-mock-gateway.sh --help
|
||||
```
|
||||
|
||||
## Mock Data
|
||||
|
||||
### Pre-configured Firmware
|
||||
|
||||
**spore-firmware**:
|
||||
- v1.0.0 - Stable, production (40% of nodes)
|
||||
- v1.1.0 - Stable, production (40% of nodes)
|
||||
- v1.2.0 - Beta (20% of nodes)
|
||||
|
||||
**sensor-firmware**:
|
||||
- v2.0.0 - Stable, sensor type
|
||||
- v2.1.0 - Stable, sensor type
|
||||
|
||||
Node labels automatically match the firmware versions in the registry, ensuring consistency between running firmware and available versions.
|
||||
|
||||
### Simulated Behaviors
|
||||
|
||||
**Firmware Uploads**:
|
||||
- 2-second simulated processing time
|
||||
- 90% success rate
|
||||
- WebSocket status broadcasts
|
||||
- In-memory storage
|
||||
|
||||
**Rollouts**:
|
||||
- Step-by-step simulation:
|
||||
1. Label update (500ms)
|
||||
2. Firmware upload (1s)
|
||||
3. Completion (500ms)
|
||||
- WebSocket progress broadcasts
|
||||
- Node version updates
|
||||
|
||||
**Node Updates**:
|
||||
- Periodic heartbeat simulation
|
||||
- Random metric updates (latency, heap)
|
||||
- Automatic status management
|
||||
|
||||
**Cluster Changes**:
|
||||
- Automatic primary selection
|
||||
- Node discovery events
|
||||
- WebSocket broadcasts
|
||||
|
||||
## API Compatibility
|
||||
|
||||
The mock gateway is fully compatible with:
|
||||
- Frontend applications expecting the real gateway API
|
||||
- Client libraries using the gateway
|
||||
- Testing frameworks
|
||||
- CI/CD pipelines
|
||||
|
||||
All responses match the real gateway's format and structure.
|
||||
|
||||
## Testing Use Cases
|
||||
|
||||
### Frontend Development
|
||||
```bash
|
||||
# Run mock gateway for UI development
|
||||
./mock-gateway -port 3001 -mock-nodes 5 -log-level info
|
||||
```
|
||||
|
||||
### Load Testing
|
||||
```bash
|
||||
# Simulate large cluster
|
||||
./mock-gateway -mock-nodes 50 -heartbeat-rate 1
|
||||
```
|
||||
|
||||
### Integration Testing
|
||||
```bash
|
||||
# Headless mode for automated tests
|
||||
./mock-gateway -log-level error -enable-ws=false
|
||||
```
|
||||
|
||||
### Demonstrations
|
||||
```bash
|
||||
# Clean output for demos
|
||||
./mock-gateway -log-level warn -mock-nodes 8
|
||||
```
|
||||
|
||||
## Implementation Details
|
||||
|
||||
### Mock Discovery (`internal/mock/discovery.go`)
|
||||
|
||||
**Features**:
|
||||
- In-memory node storage
|
||||
- Configurable node count
|
||||
- Automatic heartbeat simulation
|
||||
- Primary node management
|
||||
- Callback system for updates
|
||||
- Thread-safe operations
|
||||
|
||||
**Key Functions**:
|
||||
- `NewMockNodeDiscovery()` - Creates discovery with N nodes
|
||||
- `Start()` - Begins heartbeat simulation
|
||||
- `GetNodes()` - Returns all nodes
|
||||
- `SetPrimaryNode()` - Sets specific primary
|
||||
- `SelectRandomPrimaryNode()` - Random selection
|
||||
- `UpdateNodeVersion()` - Updates node version (for rollouts)
|
||||
|
||||
### Mock Server (`internal/mock/server.go`)
|
||||
|
||||
**Features**:
|
||||
- All REST endpoints implemented
|
||||
- CORS middleware
|
||||
- JSON middleware
|
||||
- Logging middleware
|
||||
- In-memory firmware storage
|
||||
- Rollout simulation
|
||||
|
||||
**Key Functions**:
|
||||
- `NewMockHTTPServer()` - Creates server
|
||||
- `setupRoutes()` - Configures all endpoints
|
||||
- `simulateRollout()` - Simulates rollout process
|
||||
- All endpoint handlers matching real gateway
|
||||
|
||||
### Mock WebSocket (`internal/mock/websocket.go`)
|
||||
|
||||
**Features**:
|
||||
- WebSocket connection management
|
||||
- Periodic broadcasts
|
||||
- Event-driven broadcasts
|
||||
- Client heartbeat (ping/pong)
|
||||
- Graceful shutdown
|
||||
|
||||
**Key Functions**:
|
||||
- `NewMockWebSocketServer()` - Creates WS server
|
||||
- `HandleWebSocket()` - Handles connections
|
||||
- `BroadcastClusterUpdate()` - Cluster updates
|
||||
- `BroadcastFirmwareUploadStatus()` - Upload status
|
||||
- `BroadcastRolloutProgress()` - Rollout progress
|
||||
|
||||
### Mock Data Generators (`internal/mock/data.go`)
|
||||
|
||||
**Functions**:
|
||||
- `GenerateMockClusterMembers()` - Cluster member data
|
||||
- `GenerateMockTaskStatus()` - Task scheduler status
|
||||
- `GenerateMockSystemStatus()` - System status
|
||||
- `GenerateMockCapabilities()` - API endpoints
|
||||
- `GenerateMockFirmwareList()` - Firmware registry
|
||||
- `GenerateMockFirmwareBinary()` - Firmware binary
|
||||
- `GenerateMockProxyResponse()` - Proxy call responses
|
||||
- `GenerateMockMonitoringResources()` - Comprehensive resource monitoring data
|
||||
|
||||
**Monitoring Data Includes**:
|
||||
- **CPU Metrics**: Frequency, usage percentage, temperature
|
||||
- **Memory Metrics**: Total, free, used bytes, usage percentage
|
||||
- **Network Metrics**: Bytes sent/received, packets sent/received, RSSI, signal quality
|
||||
- **Flash Metrics**: Total, used, free bytes, usage percentage
|
||||
- **Summary Statistics**: Aggregate averages across all nodes
|
||||
|
||||
## Comparison: Real vs Mock Gateway
|
||||
|
||||
| Feature | Real Gateway | Mock Gateway |
|
||||
|---------|-------------|--------------|
|
||||
| REST API | ✅ All endpoints | ✅ All endpoints |
|
||||
| WebSocket | ✅ Real-time | ✅ Real-time |
|
||||
| Type Safety | ✅ Shared types | ✅ Shared types |
|
||||
| UDP Discovery | ✅ Real UDP | ❌ Simulated |
|
||||
| SPORE Node Calls | ✅ Real HTTP | ❌ Mocked |
|
||||
| Registry Service | ✅ External | ❌ In-memory |
|
||||
| Firmware Storage | ✅ Disk/Registry | ❌ Memory |
|
||||
| Node Count | Dynamic (real) | Configurable |
|
||||
| Setup Required | Hardware + Network | None |
|
||||
| Dependencies | SPORE nodes + Registry | None |
|
||||
|
||||
## Advantages of Mock Gateway
|
||||
|
||||
1. **No Hardware Required**: Runs without any SPORE nodes
|
||||
2. **No Network Setup**: No UDP discovery configuration needed
|
||||
3. **Fast Iteration**: Instant startup and teardown
|
||||
4. **Predictable**: Consistent, reproducible behavior
|
||||
5. **Scalable**: Easily simulate hundreds of nodes
|
||||
6. **Safe**: No risk to actual hardware
|
||||
7. **Portable**: Runs anywhere Go runs
|
||||
8. **Complete**: All features implemented
|
||||
|
||||
## When to Use Which
|
||||
|
||||
**Use Real Gateway**:
|
||||
- Production deployment
|
||||
- Real hardware testing
|
||||
- Actual firmware updates
|
||||
- Network topology testing
|
||||
- Performance benchmarking
|
||||
|
||||
**Use Mock Gateway**:
|
||||
- Frontend development
|
||||
- API client development
|
||||
- Automated testing
|
||||
- Demonstrations
|
||||
- CI/CD pipelines
|
||||
- Local development
|
||||
- Load testing
|
||||
|
||||
## Maintenance
|
||||
|
||||
The mock gateway is designed to stay in sync with the real gateway:
|
||||
|
||||
1. **Types**: All types are shared, so changes propagate automatically
|
||||
2. **Endpoints**: New endpoints should be added to both implementations
|
||||
3. **Responses**: Mock responses should mirror real responses
|
||||
4. **Behaviors**: Simulate realistic timing and error rates
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
Potential improvements:
|
||||
- [ ] Configuration file support (currently CLI only)
|
||||
- [ ] Persistent storage option
|
||||
- [ ] Configurable error injection
|
||||
- [ ] Network latency simulation
|
||||
- [ ] Node failure scenarios
|
||||
- [ ] Custom firmware metadata
|
||||
- [ ] API recording/playback
|
||||
- [ ] Performance metrics
|
||||
|
||||
## Contributing
|
||||
|
||||
When adding new endpoints to the real gateway:
|
||||
|
||||
1. Add corresponding mock implementation in `internal/mock/server.go`
|
||||
2. Add mock data generators in `internal/mock/data.go` if needed
|
||||
3. Ensure type reuse from shared packages
|
||||
4. Update this documentation
|
||||
5. Test both real and mock implementations
|
||||
|
||||
## License
|
||||
|
||||
Same as main SPORE Gateway project.
|
||||
@@ -1,174 +0,0 @@
|
||||
# Mock Gateway Quick Start Guide
|
||||
|
||||
Get started with the SPORE Mock Gateway in under 1 minute!
|
||||
|
||||
## TL;DR
|
||||
|
||||
```bash
|
||||
# Build and run
|
||||
go build -o mock-gateway cmd/mock-gateway/main.go
|
||||
./mock-gateway
|
||||
```
|
||||
|
||||
That's it! The mock gateway is now running on http://localhost:3001
|
||||
|
||||
## What You Get
|
||||
|
||||
✅ **5 mock SPORE nodes** ready to use
|
||||
✅ **All REST API endpoints** working
|
||||
✅ **WebSocket support** for real-time updates
|
||||
✅ **Mock firmware registry** with sample firmware
|
||||
✅ **Simulated rollouts** with progress tracking
|
||||
|
||||
## Test It
|
||||
|
||||
### 1. Check Health
|
||||
```bash
|
||||
curl http://localhost:3001/api/health | jq
|
||||
```
|
||||
|
||||
### 2. List Nodes
|
||||
```bash
|
||||
curl http://localhost:3001/api/discovery/nodes | jq
|
||||
```
|
||||
|
||||
### 3. Get Cluster Members
|
||||
```bash
|
||||
curl http://localhost:3001/api/cluster/members | jq
|
||||
```
|
||||
|
||||
### 4. List Firmware
|
||||
```bash
|
||||
curl http://localhost:3001/api/registry/firmware | jq
|
||||
```
|
||||
|
||||
### 5. Connect WebSocket
|
||||
```javascript
|
||||
const ws = new WebSocket('ws://localhost:3001/ws');
|
||||
ws.onmessage = (event) => console.log(JSON.parse(event.data));
|
||||
```
|
||||
|
||||
## Customize It
|
||||
|
||||
```bash
|
||||
# More nodes
|
||||
./mock-gateway -mock-nodes 10
|
||||
|
||||
# Different port
|
||||
./mock-gateway -port 8080
|
||||
|
||||
# Debug logging
|
||||
./mock-gateway -log-level debug
|
||||
|
||||
# Faster updates
|
||||
./mock-gateway -heartbeat-rate 2
|
||||
|
||||
# All together
|
||||
./mock-gateway -port 8080 -mock-nodes 10 -log-level debug -heartbeat-rate 2
|
||||
```
|
||||
|
||||
## Use the Script
|
||||
|
||||
```bash
|
||||
# Easy run with default settings
|
||||
./scripts/run-mock-gateway.sh
|
||||
|
||||
# With custom options
|
||||
./scripts/run-mock-gateway.sh -p 8080 -n 10 -l debug
|
||||
|
||||
# See all options
|
||||
./scripts/run-mock-gateway.sh --help
|
||||
```
|
||||
|
||||
## Connect Your Frontend
|
||||
|
||||
Point your frontend to `http://localhost:3001` instead of the real gateway. All endpoints work the same way!
|
||||
|
||||
```javascript
|
||||
// React example
|
||||
const API_BASE = 'http://localhost:3001';
|
||||
const WS_URL = 'ws://localhost:3001/ws';
|
||||
|
||||
// Fetch nodes
|
||||
const nodes = await fetch(`${API_BASE}/api/discovery/nodes`).then(r => r.json());
|
||||
|
||||
// WebSocket connection
|
||||
const ws = new WebSocket(WS_URL);
|
||||
```
|
||||
|
||||
## What's Mocked?
|
||||
|
||||
✅ Node discovery (5 default nodes: 192.168.1.100-104)
|
||||
✅ Cluster management
|
||||
✅ Firmware uploads (90% success rate, 2s delay)
|
||||
✅ Rollouts (simulated progress: labels → upload → complete)
|
||||
✅ Task status
|
||||
✅ System status
|
||||
✅ Registry operations
|
||||
✅ WebSocket broadcasts
|
||||
|
||||
## Example Workflow
|
||||
|
||||
```bash
|
||||
# 1. Start mock gateway
|
||||
./mock-gateway
|
||||
|
||||
# 2. In another terminal, test the API
|
||||
curl http://localhost:3001/api/discovery/nodes | jq '.nodes[0]'
|
||||
|
||||
# 3. Start a rollout
|
||||
curl -X POST http://localhost:3001/api/rollout \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"firmware": {"name": "spore-firmware", "version": "1.1.0"},
|
||||
"nodes": [
|
||||
{"ip": "192.168.1.100", "version": "1.0.0", "labels": {"env": "mock"}},
|
||||
{"ip": "192.168.1.101", "version": "1.0.0", "labels": {"env": "mock"}}
|
||||
]
|
||||
}' | jq
|
||||
|
||||
# 4. Watch WebSocket for progress (in browser console)
|
||||
const ws = new WebSocket('ws://localhost:3001/ws');
|
||||
ws.onmessage = e => console.log(JSON.parse(e.data));
|
||||
```
|
||||
|
||||
## Next Steps
|
||||
|
||||
- 📖 Read [cmd/mock-gateway/README.md](cmd/mock-gateway/README.md) for detailed documentation
|
||||
- 📖 Read [MOCK_GATEWAY.md](MOCK_GATEWAY.md) for implementation details
|
||||
- 🔧 Customize the mock data in `internal/mock/data.go`
|
||||
- 🚀 Use it for frontend development, testing, or demos
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**Port already in use?**
|
||||
```bash
|
||||
./mock-gateway -port 8080
|
||||
```
|
||||
|
||||
**Need more/fewer nodes?**
|
||||
```bash
|
||||
./mock-gateway -mock-nodes 3
|
||||
```
|
||||
|
||||
**Want to see what's happening?**
|
||||
```bash
|
||||
./mock-gateway -log-level debug
|
||||
```
|
||||
|
||||
**Build fails?**
|
||||
```bash
|
||||
# Make sure you have Go 1.21 or later
|
||||
go version
|
||||
|
||||
# Update dependencies
|
||||
go mod download
|
||||
```
|
||||
|
||||
## Questions?
|
||||
|
||||
- See [MOCK_GATEWAY.md](MOCK_GATEWAY.md) for complete documentation
|
||||
- See [cmd/mock-gateway/README.md](cmd/mock-gateway/README.md) for usage details
|
||||
- Check the real gateway's documentation to understand the API
|
||||
|
||||
Happy mocking! 🎭
|
||||
@@ -1,198 +0,0 @@
|
||||
# Monitoring Resources Endpoint
|
||||
|
||||
## Overview
|
||||
|
||||
The `/api/monitoring/resources` endpoint provides comprehensive real-time resource monitoring for all nodes in the cluster.
|
||||
|
||||
## Endpoint
|
||||
|
||||
```
|
||||
GET /api/monitoring/resources
|
||||
```
|
||||
|
||||
## Response Format
|
||||
|
||||
```json
|
||||
{
|
||||
"timestamp": "2025-10-24T10:30:45Z",
|
||||
"nodes": [
|
||||
{
|
||||
"timestamp": 1729763445,
|
||||
"node_ip": "192.168.1.100",
|
||||
"hostname": "spore-node-1",
|
||||
"cpu": {
|
||||
"frequency_mhz": 160,
|
||||
"usage_percent": 42.5,
|
||||
"temperature_c": 58.3
|
||||
},
|
||||
"memory": {
|
||||
"total_bytes": 98304,
|
||||
"free_bytes": 45632,
|
||||
"used_bytes": 52672,
|
||||
"usage_percent": 53.6
|
||||
},
|
||||
"network": {
|
||||
"bytes_sent": 3245678,
|
||||
"bytes_received": 5678901,
|
||||
"packets_sent": 32456,
|
||||
"packets_received": 56789,
|
||||
"rssi_dbm": -65,
|
||||
"signal_quality_percent": 75.5
|
||||
},
|
||||
"flash": {
|
||||
"total_bytes": 4194304,
|
||||
"used_bytes": 2097152,
|
||||
"free_bytes": 2097152,
|
||||
"usage_percent": 50.0
|
||||
},
|
||||
"labels": {
|
||||
"version": "1.0.0",
|
||||
"stable": "true",
|
||||
"env": "production",
|
||||
"zone": "zone-1",
|
||||
"type": "spore-node"
|
||||
}
|
||||
}
|
||||
],
|
||||
"summary": {
|
||||
"total_nodes": 5,
|
||||
"avg_cpu_usage_percent": 38.7,
|
||||
"avg_memory_usage_percent": 51.2,
|
||||
"avg_flash_usage_percent": 52.8,
|
||||
"total_bytes_sent": 16228390,
|
||||
"total_bytes_received": 28394505
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Data Fields
|
||||
|
||||
### CPU Metrics
|
||||
- **frequency_mhz**: Current CPU frequency in MHz (80-240 MHz typical for ESP32)
|
||||
- **usage_percent**: CPU utilization percentage (0-100%)
|
||||
- **temperature_c**: CPU temperature in Celsius (45-65°C typical)
|
||||
|
||||
### Memory Metrics
|
||||
- **total_bytes**: Total RAM available (64-128 KB typical)
|
||||
- **free_bytes**: Free RAM available
|
||||
- **used_bytes**: Used RAM
|
||||
- **usage_percent**: Memory utilization percentage
|
||||
|
||||
### Network Metrics
|
||||
- **bytes_sent**: Total bytes transmitted since boot
|
||||
- **bytes_received**: Total bytes received since boot
|
||||
- **packets_sent**: Total packets transmitted
|
||||
- **packets_received**: Total packets received
|
||||
- **rssi_dbm**: WiFi signal strength in dBm (-30 to -90 typical)
|
||||
- **signal_quality_percent**: WiFi signal quality (0-100%)
|
||||
|
||||
### Flash Metrics
|
||||
- **total_bytes**: Total flash storage (typically 4MB)
|
||||
- **used_bytes**: Used flash storage
|
||||
- **free_bytes**: Free flash storage
|
||||
- **usage_percent**: Flash utilization percentage
|
||||
|
||||
### Node Labels
|
||||
Each node includes labels that match firmware versions:
|
||||
- **version**: Current firmware version (e.g., "1.0.0", "1.1.0", "1.2.0")
|
||||
- **stable**: Whether this is a stable release ("true" or "false")
|
||||
- **env**: Environment (e.g., "production", "beta")
|
||||
- **zone**: Deployment zone (e.g., "zone-1", "zone-2", "zone-3")
|
||||
- **type**: Node type (e.g., "spore-node")
|
||||
|
||||
### Summary Statistics
|
||||
Aggregate metrics across all nodes:
|
||||
- **total_nodes**: Total number of nodes monitored
|
||||
- **avg_cpu_usage_percent**: Average CPU usage across all nodes
|
||||
- **avg_memory_usage_percent**: Average memory usage across all nodes
|
||||
- **avg_flash_usage_percent**: Average flash usage across all nodes
|
||||
- **total_bytes_sent**: Combined network traffic sent
|
||||
- **total_bytes_received**: Combined network traffic received
|
||||
|
||||
## Firmware Version Matching
|
||||
|
||||
Node labels are automatically synchronized with the firmware available in the registry:
|
||||
|
||||
| Version | Registry Status | Node Distribution | Environment |
|
||||
|---------|----------------|-------------------|-------------|
|
||||
| 1.0.0 | Stable | 40% of nodes | production |
|
||||
| 1.1.0 | Stable | 40% of nodes | production |
|
||||
| 1.2.0 | Beta | 20% of nodes | beta |
|
||||
|
||||
This ensures that monitoring data accurately reflects which firmware versions are deployed across the cluster.
|
||||
|
||||
## Use Cases
|
||||
|
||||
### 1. Real-time Dashboard
|
||||
Display live resource usage for all nodes in a monitoring dashboard.
|
||||
|
||||
### 2. Alerting
|
||||
Set up alerts based on thresholds:
|
||||
- CPU usage > 80%
|
||||
- Memory usage > 90%
|
||||
- Flash usage > 95%
|
||||
- WiFi signal quality < 30%
|
||||
|
||||
### 3. Capacity Planning
|
||||
Track resource trends to plan firmware optimizations or hardware upgrades.
|
||||
|
||||
### 4. Firmware Rollout Monitoring
|
||||
Monitor resource usage before, during, and after firmware rollouts to detect issues.
|
||||
|
||||
### 5. Network Health
|
||||
Track WiFi signal quality and network traffic to identify connectivity issues.
|
||||
|
||||
## Example Usage
|
||||
|
||||
### cURL
|
||||
```bash
|
||||
curl http://localhost:3001/api/monitoring/resources
|
||||
```
|
||||
|
||||
### JavaScript (fetch)
|
||||
```javascript
|
||||
const response = await fetch('http://localhost:3001/api/monitoring/resources');
|
||||
const data = await response.json();
|
||||
|
||||
console.log(`Monitoring ${data.summary.total_nodes} nodes`);
|
||||
console.log(`Average CPU: ${data.summary.avg_cpu_usage_percent.toFixed(1)}%`);
|
||||
console.log(`Average Memory: ${data.summary.avg_memory_usage_percent.toFixed(1)}%`);
|
||||
|
||||
data.nodes.forEach(node => {
|
||||
console.log(`${node.hostname} (${node.labels.version}): CPU ${node.cpu.usage_percent.toFixed(1)}%`);
|
||||
});
|
||||
```
|
||||
|
||||
### Python
|
||||
```python
|
||||
import requests
|
||||
|
||||
response = requests.get('http://localhost:3001/api/monitoring/resources')
|
||||
data = response.json()
|
||||
|
||||
print(f"Monitoring {data['summary']['total_nodes']} nodes")
|
||||
print(f"Average CPU: {data['summary']['avg_cpu_usage_percent']:.1f}%")
|
||||
print(f"Average Memory: {data['summary']['avg_memory_usage_percent']:.1f}%")
|
||||
|
||||
for node in data['nodes']:
|
||||
print(f"{node['hostname']} ({node['labels']['version']}): "
|
||||
f"CPU {node['cpu']['usage_percent']:.1f}%")
|
||||
```
|
||||
|
||||
## Mock Gateway Behavior
|
||||
|
||||
The mock gateway generates realistic monitoring data with:
|
||||
- **Dynamic values**: CPU, memory, and network metrics vary on each request
|
||||
- **Realistic ranges**: Values stay within typical ESP32 hardware limits
|
||||
- **Signal quality**: WiFi RSSI converted to quality percentage
|
||||
- **Consistent labels**: Node labels always match firmware registry versions
|
||||
- **Aggregate summaries**: Automatic calculation of cluster-wide statistics
|
||||
|
||||
## Integration with WebSocket
|
||||
|
||||
For real-time updates, consider combining this endpoint with the WebSocket connection at `/ws` which broadcasts:
|
||||
- Node status changes
|
||||
- Firmware update progress
|
||||
- Cluster membership changes
|
||||
|
||||
The monitoring endpoint provides detailed point-in-time snapshots, while WebSocket provides real-time event streams.
|
||||
@@ -57,7 +57,7 @@ func NewNodeDiscovery(udpPort string) *NodeDiscovery {
|
||||
return &NodeDiscovery{
|
||||
udpPort: udpPort,
|
||||
discoveredNodes: make(map[string]*NodeInfo),
|
||||
staleThreshold: 10 * time.Second, // TODO make configurable
|
||||
staleThreshold: 10 * time.Second, // Heartbeat timeout - mark nodes inactive after 10 seconds
|
||||
logger: log.New(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,449 +0,0 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"spore-gateway/pkg/client"
|
||||
"spore-gateway/pkg/registry"
|
||||
)
|
||||
|
||||
// GenerateMockClusterMembers generates mock cluster member data
|
||||
func GenerateMockClusterMembers(nodes map[string]*NodeInfo) []client.ClusterMember {
|
||||
members := make([]client.ClusterMember, 0, len(nodes))
|
||||
|
||||
for _, node := range nodes {
|
||||
member := client.ClusterMember{
|
||||
IP: node.IP,
|
||||
Hostname: node.Hostname,
|
||||
Status: string(node.Status),
|
||||
Latency: node.Latency,
|
||||
LastSeen: node.LastSeen.Unix(),
|
||||
Labels: node.Labels,
|
||||
Resources: map[string]interface{}{
|
||||
"freeHeap": 32768 + rand.Intn(32768),
|
||||
"cpuFreqMHz": 80 + rand.Intn(160),
|
||||
"flashChipSize": 4194304,
|
||||
},
|
||||
}
|
||||
members = append(members, member)
|
||||
}
|
||||
|
||||
return members
|
||||
}
|
||||
|
||||
// GenerateMockTaskStatus generates mock task status data
|
||||
func GenerateMockTaskStatus() *client.TaskStatusResponse {
|
||||
tasks := []client.TaskInfo{
|
||||
{
|
||||
Name: "HeartbeatTask",
|
||||
Interval: 5000,
|
||||
Enabled: true,
|
||||
Running: true,
|
||||
AutoStart: true,
|
||||
},
|
||||
{
|
||||
Name: "SensorReadTask",
|
||||
Interval: 10000,
|
||||
Enabled: true,
|
||||
Running: true,
|
||||
AutoStart: true,
|
||||
},
|
||||
{
|
||||
Name: "StatusUpdateTask",
|
||||
Interval: 30000,
|
||||
Enabled: true,
|
||||
Running: false,
|
||||
AutoStart: false,
|
||||
},
|
||||
{
|
||||
Name: "CleanupTask",
|
||||
Interval: 60000,
|
||||
Enabled: false,
|
||||
Running: false,
|
||||
AutoStart: false,
|
||||
},
|
||||
}
|
||||
|
||||
activeTasks := 0
|
||||
for _, task := range tasks {
|
||||
if task.Running {
|
||||
activeTasks++
|
||||
}
|
||||
}
|
||||
|
||||
return &client.TaskStatusResponse{
|
||||
Summary: client.TaskSummary{
|
||||
TotalTasks: len(tasks),
|
||||
ActiveTasks: activeTasks,
|
||||
},
|
||||
Tasks: tasks,
|
||||
System: client.SystemInfo{
|
||||
FreeHeap: 32768 + int64(rand.Intn(32768)),
|
||||
Uptime: int64(time.Now().Unix() - 3600*24), // 24 hours uptime
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateMockSystemStatus generates mock system status data
|
||||
func GenerateMockSystemStatus(labels map[string]string) *client.SystemStatusResponse {
|
||||
return &client.SystemStatusResponse{
|
||||
FreeHeap: 32768 + int64(rand.Intn(32768)),
|
||||
ChipID: int64(rand.Int31()),
|
||||
SDKVersion: "3.1.0",
|
||||
CPUFreqMHz: 80 + rand.Intn(160),
|
||||
FlashChipSize: 4194304,
|
||||
Labels: labels,
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateMockCapabilities generates mock API endpoint capabilities
|
||||
func GenerateMockCapabilities() *client.CapabilitiesResponse {
|
||||
return &client.CapabilitiesResponse{
|
||||
Endpoints: []client.EndpointInfo{
|
||||
{
|
||||
URI: "/api/node/status",
|
||||
Method: "GET",
|
||||
Parameters: []client.ParameterInfo{
|
||||
{
|
||||
Name: "detailed",
|
||||
Type: "boolean",
|
||||
Required: false,
|
||||
Description: "Include detailed system information",
|
||||
Location: "query",
|
||||
Default: "false",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
URI: "/api/cluster/members",
|
||||
Method: "GET",
|
||||
Parameters: []client.ParameterInfo{},
|
||||
},
|
||||
{
|
||||
URI: "/api/tasks/status",
|
||||
Method: "GET",
|
||||
Parameters: []client.ParameterInfo{},
|
||||
},
|
||||
{
|
||||
URI: "/api/node/update",
|
||||
Method: "POST",
|
||||
Parameters: []client.ParameterInfo{
|
||||
{
|
||||
Name: "firmware",
|
||||
Type: "file",
|
||||
Required: true,
|
||||
Description: "Firmware binary file",
|
||||
Location: "body",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
URI: "/api/node/config",
|
||||
Method: "POST",
|
||||
Parameters: []client.ParameterInfo{
|
||||
{
|
||||
Name: "labels",
|
||||
Type: "json",
|
||||
Required: true,
|
||||
Description: "Node labels in JSON format",
|
||||
Location: "body",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
URI: "/api/sensors/read",
|
||||
Method: "GET",
|
||||
Parameters: []client.ParameterInfo{
|
||||
{
|
||||
Name: "sensor",
|
||||
Type: "string",
|
||||
Required: false,
|
||||
Description: "Specific sensor to read",
|
||||
Location: "query",
|
||||
Values: []string{"temperature", "humidity", "pressure"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateMockFirmwareList generates mock firmware registry data
|
||||
func GenerateMockFirmwareList() []registry.GroupedFirmware {
|
||||
return []registry.GroupedFirmware{
|
||||
{
|
||||
Name: "spore-firmware",
|
||||
Firmware: []registry.FirmwareRecord{
|
||||
{
|
||||
Name: "spore-firmware",
|
||||
Version: "1.0.0",
|
||||
Size: 524288,
|
||||
Labels: map[string]string{
|
||||
"stable": "true",
|
||||
"env": "production",
|
||||
},
|
||||
Path: "/firmware/spore-firmware/1.0.0",
|
||||
},
|
||||
{
|
||||
Name: "spore-firmware",
|
||||
Version: "1.1.0",
|
||||
Size: 548864,
|
||||
Labels: map[string]string{
|
||||
"stable": "true",
|
||||
"env": "production",
|
||||
},
|
||||
Path: "/firmware/spore-firmware/1.1.0",
|
||||
},
|
||||
{
|
||||
Name: "spore-firmware",
|
||||
Version: "1.2.0",
|
||||
Size: 573440,
|
||||
Labels: map[string]string{
|
||||
"stable": "false",
|
||||
"env": "beta",
|
||||
},
|
||||
Path: "/firmware/spore-firmware/1.2.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "sensor-firmware",
|
||||
Firmware: []registry.FirmwareRecord{
|
||||
{
|
||||
Name: "sensor-firmware",
|
||||
Version: "2.0.0",
|
||||
Size: 262144,
|
||||
Labels: map[string]string{
|
||||
"stable": "true",
|
||||
"type": "sensor",
|
||||
},
|
||||
Path: "/firmware/sensor-firmware/2.0.0",
|
||||
},
|
||||
{
|
||||
Name: "sensor-firmware",
|
||||
Version: "2.1.0",
|
||||
Size: 286720,
|
||||
Labels: map[string]string{
|
||||
"stable": "true",
|
||||
"type": "sensor",
|
||||
},
|
||||
Path: "/firmware/sensor-firmware/2.1.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateMockFirmwareBinary generates mock firmware binary data
|
||||
func GenerateMockFirmwareBinary(size int) []byte {
|
||||
// Generate some pseudo-random but deterministic binary data
|
||||
data := make([]byte, size)
|
||||
for i := range data {
|
||||
data[i] = byte(i % 256)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// NodeInfo is an alias to avoid import cycle
|
||||
type NodeInfo struct {
|
||||
IP string
|
||||
Hostname string
|
||||
Status string
|
||||
Latency int64
|
||||
LastSeen time.Time
|
||||
Labels map[string]string
|
||||
}
|
||||
|
||||
// ResourceMetrics represents resource usage metrics for a node
|
||||
type ResourceMetrics struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
NodeIP string `json:"node_ip"`
|
||||
Hostname string `json:"hostname"`
|
||||
CPU CPUMetrics `json:"cpu"`
|
||||
Memory MemoryMetrics `json:"memory"`
|
||||
Network NetworkMetrics `json:"network"`
|
||||
Flash FlashMetrics `json:"flash"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
}
|
||||
|
||||
// CPUMetrics represents CPU usage metrics
|
||||
type CPUMetrics struct {
|
||||
Frequency int `json:"frequency_mhz"`
|
||||
UsagePercent float64 `json:"usage_percent"`
|
||||
Temperature float64 `json:"temperature_c,omitempty"`
|
||||
}
|
||||
|
||||
// MemoryMetrics represents memory usage metrics
|
||||
type MemoryMetrics struct {
|
||||
Total int64 `json:"total_bytes"`
|
||||
Free int64 `json:"free_bytes"`
|
||||
Used int64 `json:"used_bytes"`
|
||||
UsagePercent float64 `json:"usage_percent"`
|
||||
}
|
||||
|
||||
// NetworkMetrics represents network usage metrics
|
||||
type NetworkMetrics struct {
|
||||
BytesSent int64 `json:"bytes_sent"`
|
||||
BytesReceived int64 `json:"bytes_received"`
|
||||
PacketsSent int64 `json:"packets_sent"`
|
||||
PacketsRecv int64 `json:"packets_received"`
|
||||
RSSI int `json:"rssi_dbm,omitempty"`
|
||||
SignalQuality float64 `json:"signal_quality_percent,omitempty"`
|
||||
}
|
||||
|
||||
// FlashMetrics represents flash storage metrics
|
||||
type FlashMetrics struct {
|
||||
Total int64 `json:"total_bytes"`
|
||||
Used int64 `json:"used_bytes"`
|
||||
Free int64 `json:"free_bytes"`
|
||||
UsagePercent float64 `json:"usage_percent"`
|
||||
}
|
||||
|
||||
// MonitoringResourcesResponse represents the monitoring resources endpoint response
|
||||
type MonitoringResourcesResponse struct {
|
||||
Timestamp string `json:"timestamp"`
|
||||
Nodes []ResourceMetrics `json:"nodes"`
|
||||
Summary ResourceSummary `json:"summary"`
|
||||
}
|
||||
|
||||
// ResourceSummary provides aggregate statistics across all nodes
|
||||
type ResourceSummary struct {
|
||||
TotalNodes int `json:"total_nodes"`
|
||||
AvgCPUUsage float64 `json:"avg_cpu_usage_percent"`
|
||||
AvgMemoryUsage float64 `json:"avg_memory_usage_percent"`
|
||||
AvgFlashUsage float64 `json:"avg_flash_usage_percent"`
|
||||
TotalBytesSent int64 `json:"total_bytes_sent"`
|
||||
TotalBytesRecv int64 `json:"total_bytes_received"`
|
||||
}
|
||||
|
||||
// GenerateMockMonitoringResources generates meaningful mock monitoring data for all nodes
|
||||
func GenerateMockMonitoringResources(nodes map[string]*NodeInfo) *MonitoringResourcesResponse {
|
||||
now := time.Now()
|
||||
metrics := make([]ResourceMetrics, 0, len(nodes))
|
||||
|
||||
var totalCPU, totalMemoryPercent, totalFlash float64
|
||||
var totalBytesSent, totalBytesRecv int64
|
||||
|
||||
for _, node := range nodes {
|
||||
// Generate realistic resource usage based on node characteristics
|
||||
cpuFreq := 80 + rand.Intn(160)
|
||||
cpuUsage := 15.0 + rand.Float64()*45.0 // 15-60% usage
|
||||
|
||||
// Memory metrics
|
||||
nodeMemoryTotal := int64(65536 + rand.Intn(65536)) // 64-128KB
|
||||
freeMemory := int64(32768 + rand.Intn(32768)) // 32-64KB free
|
||||
usedMemory := nodeMemoryTotal - freeMemory
|
||||
memoryUsagePercent := float64(usedMemory) / float64(nodeMemoryTotal) * 100
|
||||
|
||||
// Flash metrics
|
||||
flashTotal := int64(4194304) // 4MB
|
||||
flashUsed := int64(1048576 + rand.Intn(2097152)) // 1-3MB used
|
||||
flashFree := flashTotal - flashUsed
|
||||
flashUsagePercent := float64(flashUsed) / float64(flashTotal) * 100
|
||||
|
||||
// Network metrics (simulating accumulated traffic)
|
||||
bytesSent := int64(1000000 + rand.Intn(5000000)) // 1-6MB
|
||||
bytesRecv := int64(2000000 + rand.Intn(8000000)) // 2-10MB
|
||||
packetsSent := int64(10000 + rand.Intn(50000))
|
||||
packetsRecv := int64(15000 + rand.Intn(75000))
|
||||
|
||||
// WiFi signal metrics
|
||||
rssi := -30 - rand.Intn(60) // -30 to -90 dBm
|
||||
signalQuality := float64(100+rssi+90) / 60.0 * 100 // Convert RSSI to quality percentage
|
||||
if signalQuality < 0 {
|
||||
signalQuality = 0
|
||||
} else if signalQuality > 100 {
|
||||
signalQuality = 100
|
||||
}
|
||||
|
||||
metric := ResourceMetrics{
|
||||
Timestamp: now.Unix(),
|
||||
NodeIP: node.IP,
|
||||
Hostname: node.Hostname,
|
||||
CPU: CPUMetrics{
|
||||
Frequency: cpuFreq,
|
||||
UsagePercent: cpuUsage,
|
||||
Temperature: 45.0 + rand.Float64()*20.0, // 45-65°C
|
||||
},
|
||||
Memory: MemoryMetrics{
|
||||
Total: nodeMemoryTotal,
|
||||
Free: freeMemory,
|
||||
Used: usedMemory,
|
||||
UsagePercent: memoryUsagePercent,
|
||||
},
|
||||
Network: NetworkMetrics{
|
||||
BytesSent: bytesSent,
|
||||
BytesReceived: bytesRecv,
|
||||
PacketsSent: packetsSent,
|
||||
PacketsRecv: packetsRecv,
|
||||
RSSI: rssi,
|
||||
SignalQuality: signalQuality,
|
||||
},
|
||||
Flash: FlashMetrics{
|
||||
Total: flashTotal,
|
||||
Used: flashUsed,
|
||||
Free: flashFree,
|
||||
UsagePercent: flashUsagePercent,
|
||||
},
|
||||
Labels: node.Labels,
|
||||
}
|
||||
|
||||
metrics = append(metrics, metric)
|
||||
|
||||
// Accumulate for summary
|
||||
totalCPU += cpuUsage
|
||||
totalMemoryPercent += memoryUsagePercent
|
||||
totalFlash += flashUsagePercent
|
||||
totalBytesSent += bytesSent
|
||||
totalBytesRecv += bytesRecv
|
||||
}
|
||||
|
||||
// Calculate averages
|
||||
nodeCount := len(nodes)
|
||||
var avgCPU, avgMemory, avgFlash float64
|
||||
if nodeCount > 0 {
|
||||
avgCPU = totalCPU / float64(nodeCount)
|
||||
avgMemory = totalMemoryPercent / float64(nodeCount)
|
||||
avgFlash = totalFlash / float64(nodeCount)
|
||||
}
|
||||
|
||||
return &MonitoringResourcesResponse{
|
||||
Timestamp: now.Format(time.RFC3339),
|
||||
Nodes: metrics,
|
||||
Summary: ResourceSummary{
|
||||
TotalNodes: nodeCount,
|
||||
AvgCPUUsage: avgCPU,
|
||||
AvgMemoryUsage: avgMemory,
|
||||
AvgFlashUsage: avgFlash,
|
||||
TotalBytesSent: totalBytesSent,
|
||||
TotalBytesRecv: totalBytesRecv,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateMockProxyResponse generates a mock response for proxy calls
|
||||
func GenerateMockProxyResponse(method, uri string) map[string]interface{} {
|
||||
switch uri {
|
||||
case "/api/sensors/read":
|
||||
return map[string]interface{}{
|
||||
"temperature": 22.5 + rand.Float64()*5,
|
||||
"humidity": 45.0 + rand.Float64()*20,
|
||||
"pressure": 1013.0 + rand.Float64()*10,
|
||||
"timestamp": time.Now().Unix(),
|
||||
}
|
||||
case "/api/led/control":
|
||||
return map[string]interface{}{
|
||||
"status": "success",
|
||||
"message": "LED state updated",
|
||||
"state": "on",
|
||||
}
|
||||
default:
|
||||
return map[string]interface{}{
|
||||
"status": "success",
|
||||
"message": fmt.Sprintf("Mock response for %s %s", method, uri),
|
||||
"data": map[string]interface{}{"mock": true},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,274 +0,0 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"spore-gateway/internal/discovery"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// MockNodeDiscovery simulates node discovery with mock nodes
|
||||
type MockNodeDiscovery struct {
|
||||
mockNodes map[string]*discovery.NodeInfo
|
||||
primaryNode string
|
||||
mutex sync.RWMutex
|
||||
callbacks []discovery.NodeUpdateCallback
|
||||
heartbeatRate time.Duration
|
||||
shutdownChan chan struct{}
|
||||
shutdownOnce sync.Once
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// NewMockNodeDiscovery creates a new mock node discovery instance
|
||||
func NewMockNodeDiscovery(numNodes int, heartbeatRate time.Duration) *MockNodeDiscovery {
|
||||
mnd := &MockNodeDiscovery{
|
||||
mockNodes: make(map[string]*discovery.NodeInfo),
|
||||
heartbeatRate: heartbeatRate,
|
||||
shutdownChan: make(chan struct{}),
|
||||
logger: log.New(),
|
||||
}
|
||||
|
||||
// Generate mock nodes with realistic firmware versions
|
||||
// These versions should match the firmware available in the registry
|
||||
now := time.Now()
|
||||
for i := 0; i < numNodes; i++ {
|
||||
ip := fmt.Sprintf("192.168.1.%d", 100+i)
|
||||
hostname := fmt.Sprintf("spore-node-%d", i+1)
|
||||
|
||||
// Distribute nodes across different firmware versions
|
||||
// Most nodes on stable versions, some on beta
|
||||
var version string
|
||||
switch i % 5 {
|
||||
case 0, 1:
|
||||
version = "1.0.0" // 40% on oldest stable
|
||||
case 2, 3:
|
||||
version = "1.1.0" // 40% on newer stable
|
||||
case 4:
|
||||
version = "1.2.0" // 20% on beta
|
||||
}
|
||||
|
||||
// Determine stability based on version
|
||||
stable := "true"
|
||||
env := "production"
|
||||
if version == "1.2.0" {
|
||||
stable = "false"
|
||||
env = "beta"
|
||||
}
|
||||
|
||||
nodeInfo := &discovery.NodeInfo{
|
||||
IP: ip,
|
||||
Port: 80,
|
||||
Hostname: hostname,
|
||||
Status: discovery.NodeStatusActive,
|
||||
DiscoveredAt: now.Add(-time.Duration(i*5) * time.Minute),
|
||||
LastSeen: now,
|
||||
Uptime: fmt.Sprintf("%dh%dm", 10+i, rand.Intn(60)),
|
||||
Labels: map[string]string{
|
||||
"version": version,
|
||||
"stable": stable,
|
||||
"env": env,
|
||||
"zone": fmt.Sprintf("zone-%d", (i%3)+1),
|
||||
"type": "spore-node",
|
||||
},
|
||||
Latency: int64(10 + rand.Intn(50)),
|
||||
Resources: map[string]interface{}{
|
||||
"freeHeap": 32768 + rand.Intn(32768),
|
||||
"cpuFreqMHz": 80 + rand.Intn(160),
|
||||
"flashChipSize": 4194304,
|
||||
},
|
||||
}
|
||||
|
||||
mnd.mockNodes[ip] = nodeInfo
|
||||
}
|
||||
|
||||
// Set first node as primary
|
||||
if numNodes > 0 {
|
||||
mnd.primaryNode = fmt.Sprintf("192.168.1.%d", 100)
|
||||
}
|
||||
|
||||
mnd.logger.WithField("nodes", numNodes).Info("Mock discovery initialized with nodes")
|
||||
return mnd
|
||||
}
|
||||
|
||||
// Start starts the mock discovery (simulates periodic heartbeats)
|
||||
func (mnd *MockNodeDiscovery) Start() error {
|
||||
mnd.logger.Info("Starting mock node discovery")
|
||||
|
||||
// Simulate periodic node updates
|
||||
go mnd.simulateHeartbeats()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the mock discovery
|
||||
func (mnd *MockNodeDiscovery) Shutdown(ctx context.Context) error {
|
||||
mnd.shutdownOnce.Do(func() {
|
||||
mnd.logger.Info("Shutting down mock node discovery")
|
||||
close(mnd.shutdownChan)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// simulateHeartbeats simulates periodic node heartbeats and updates
|
||||
func (mnd *MockNodeDiscovery) simulateHeartbeats() {
|
||||
ticker := time.NewTicker(mnd.heartbeatRate)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-mnd.shutdownChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
mnd.updateMockNodes()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateMockNodes simulates node updates (version changes, status changes, etc.)
|
||||
func (mnd *MockNodeDiscovery) updateMockNodes() {
|
||||
mnd.mutex.Lock()
|
||||
defer mnd.mutex.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
for ip, node := range mnd.mockNodes {
|
||||
// Update last seen time
|
||||
node.LastSeen = now
|
||||
|
||||
// Randomly update some metrics
|
||||
if rand.Float32() < 0.3 { // 30% chance
|
||||
node.Latency = int64(10 + rand.Intn(50))
|
||||
// Update resources map
|
||||
node.Resources["freeHeap"] = 32768 + rand.Intn(32768)
|
||||
}
|
||||
|
||||
// Occasionally notify about updates
|
||||
if rand.Float32() < 0.1 { // 10% chance
|
||||
mnd.notifyCallbacks(ip, "heartbeat")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetNodes returns a copy of all mock nodes
|
||||
func (mnd *MockNodeDiscovery) GetNodes() map[string]*discovery.NodeInfo {
|
||||
mnd.mutex.RLock()
|
||||
defer mnd.mutex.RUnlock()
|
||||
|
||||
nodes := make(map[string]*discovery.NodeInfo)
|
||||
for ip, node := range mnd.mockNodes {
|
||||
// Create a copy
|
||||
nodeCopy := *node
|
||||
nodes[ip] = &nodeCopy
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
// GetPrimaryNode returns the current primary node IP
|
||||
func (mnd *MockNodeDiscovery) GetPrimaryNode() string {
|
||||
mnd.mutex.RLock()
|
||||
defer mnd.mutex.RUnlock()
|
||||
return mnd.primaryNode
|
||||
}
|
||||
|
||||
// SetPrimaryNode manually sets the primary node
|
||||
func (mnd *MockNodeDiscovery) SetPrimaryNode(ip string) error {
|
||||
mnd.mutex.Lock()
|
||||
defer mnd.mutex.Unlock()
|
||||
|
||||
if _, exists := mnd.mockNodes[ip]; !exists {
|
||||
return fmt.Errorf("node %s not found", ip)
|
||||
}
|
||||
|
||||
oldPrimary := mnd.primaryNode
|
||||
mnd.primaryNode = ip
|
||||
mnd.logger.WithFields(log.Fields{
|
||||
"old_primary": oldPrimary,
|
||||
"new_primary": ip,
|
||||
}).Info("Primary node changed")
|
||||
|
||||
mnd.notifyCallbacks(ip, "primary_changed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// SelectRandomPrimaryNode selects a random active node as primary
|
||||
func (mnd *MockNodeDiscovery) SelectRandomPrimaryNode() string {
|
||||
mnd.mutex.Lock()
|
||||
defer mnd.mutex.Unlock()
|
||||
|
||||
if len(mnd.mockNodes) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Get active nodes
|
||||
var activeNodes []string
|
||||
for ip, node := range mnd.mockNodes {
|
||||
if node.Status == discovery.NodeStatusActive {
|
||||
activeNodes = append(activeNodes, ip)
|
||||
}
|
||||
}
|
||||
|
||||
if len(activeNodes) == 0 {
|
||||
return mnd.primaryNode
|
||||
}
|
||||
|
||||
// Select random node
|
||||
randomIndex := rand.Intn(len(activeNodes))
|
||||
randomNode := activeNodes[randomIndex]
|
||||
|
||||
oldPrimary := mnd.primaryNode
|
||||
mnd.primaryNode = randomNode
|
||||
mnd.logger.WithFields(log.Fields{
|
||||
"old_primary": oldPrimary,
|
||||
"new_primary": randomNode,
|
||||
}).Info("Randomly selected new primary node")
|
||||
|
||||
mnd.notifyCallbacks(randomNode, "primary_changed")
|
||||
return randomNode
|
||||
}
|
||||
|
||||
// AddCallback registers a callback for node updates
|
||||
func (mnd *MockNodeDiscovery) AddCallback(callback discovery.NodeUpdateCallback) {
|
||||
mnd.mutex.Lock()
|
||||
defer mnd.mutex.Unlock()
|
||||
mnd.callbacks = append(mnd.callbacks, callback)
|
||||
}
|
||||
|
||||
// GetClusterStatus returns current cluster status
|
||||
func (mnd *MockNodeDiscovery) GetClusterStatus() discovery.ClusterStatus {
|
||||
mnd.mutex.RLock()
|
||||
defer mnd.mutex.RUnlock()
|
||||
|
||||
return discovery.ClusterStatus{
|
||||
PrimaryNode: mnd.primaryNode,
|
||||
TotalNodes: len(mnd.mockNodes),
|
||||
UDPPort: "4210",
|
||||
ServerRunning: true,
|
||||
}
|
||||
}
|
||||
|
||||
// notifyCallbacks notifies all registered callbacks about node changes
|
||||
func (mnd *MockNodeDiscovery) notifyCallbacks(nodeIP, action string) {
|
||||
for _, callback := range mnd.callbacks {
|
||||
go callback(nodeIP, action)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateNodeVersion simulates updating a node's version (for testing rollouts)
|
||||
func (mnd *MockNodeDiscovery) UpdateNodeVersion(ip, version string) {
|
||||
mnd.mutex.Lock()
|
||||
defer mnd.mutex.Unlock()
|
||||
|
||||
if node, exists := mnd.mockNodes[ip]; exists {
|
||||
node.Labels["version"] = version
|
||||
mnd.logger.WithFields(log.Fields{
|
||||
"ip": ip,
|
||||
"version": version,
|
||||
}).Info("Updated node version")
|
||||
mnd.notifyCallbacks(ip, "version_updated")
|
||||
}
|
||||
}
|
||||
@@ -1,752 +0,0 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"spore-gateway/internal/discovery"
|
||||
"spore-gateway/pkg/client"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// MockHTTPServer represents the mock HTTP server
|
||||
type MockHTTPServer struct {
|
||||
port string
|
||||
router *mux.Router
|
||||
discovery *MockNodeDiscovery
|
||||
wsServer *MockWebSocketServer
|
||||
server *http.Server
|
||||
enableWS bool
|
||||
firmwareStore map[string][]byte // Simple in-memory firmware storage
|
||||
}
|
||||
|
||||
// NewMockHTTPServer creates a new mock HTTP server instance
|
||||
func NewMockHTTPServer(port string, discovery *MockNodeDiscovery, enableWS bool) *MockHTTPServer {
|
||||
ms := &MockHTTPServer{
|
||||
port: port,
|
||||
router: mux.NewRouter(),
|
||||
discovery: discovery,
|
||||
enableWS: enableWS,
|
||||
firmwareStore: make(map[string][]byte),
|
||||
}
|
||||
|
||||
// Initialize WebSocket server if enabled
|
||||
if enableWS {
|
||||
ms.wsServer = NewMockWebSocketServer(discovery)
|
||||
}
|
||||
|
||||
ms.setupRoutes()
|
||||
ms.setupMiddleware()
|
||||
|
||||
ms.server = &http.Server{
|
||||
Addr: ":" + port,
|
||||
Handler: ms.router,
|
||||
ReadTimeout: 30 * time.Second,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
IdleTimeout: 60 * time.Second,
|
||||
}
|
||||
|
||||
return ms
|
||||
}
|
||||
|
||||
// setupMiddleware configures middleware for the server
|
||||
func (ms *MockHTTPServer) setupMiddleware() {
|
||||
ms.router.Use(ms.corsMiddleware)
|
||||
ms.router.Use(ms.jsonMiddleware)
|
||||
ms.router.Use(ms.loggingMiddleware)
|
||||
}
|
||||
|
||||
// corsMiddleware handles CORS headers
|
||||
func (ms *MockHTTPServer) corsMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, Accept")
|
||||
w.Header().Set("Access-Control-Expose-Headers", "Content-Type, Content-Length")
|
||||
|
||||
if r.Method == "OPTIONS" {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// jsonMiddleware sets JSON content type
|
||||
func (ms *MockHTTPServer) jsonMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
// loggingMiddleware logs HTTP requests
|
||||
func (ms *MockHTTPServer) loggingMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
log.WithFields(log.Fields{
|
||||
"method": r.Method,
|
||||
"path": r.URL.Path,
|
||||
"remote_addr": r.RemoteAddr,
|
||||
"duration": time.Since(start),
|
||||
}).Debug("HTTP request")
|
||||
})
|
||||
}
|
||||
|
||||
// setupRoutes configures all the API routes
|
||||
func (ms *MockHTTPServer) setupRoutes() {
|
||||
// API routes
|
||||
api := ms.router.PathPrefix("/api").Subrouter()
|
||||
api.Use(ms.corsMiddleware)
|
||||
|
||||
// Discovery endpoints
|
||||
api.HandleFunc("/discovery/nodes", ms.getDiscoveryNodes).Methods("GET")
|
||||
api.HandleFunc("/discovery/refresh", ms.refreshDiscovery).Methods("POST", "OPTIONS")
|
||||
api.HandleFunc("/discovery/random-primary", ms.selectRandomPrimary).Methods("POST", "OPTIONS")
|
||||
api.HandleFunc("/discovery/primary/{ip}", ms.setPrimaryNode).Methods("POST", "OPTIONS")
|
||||
|
||||
// Cluster endpoints
|
||||
api.HandleFunc("/cluster/members", ms.getClusterMembers).Methods("GET")
|
||||
api.HandleFunc("/cluster/refresh", ms.refreshCluster).Methods("POST", "OPTIONS")
|
||||
api.HandleFunc("/cluster/node/versions", ms.getClusterNodeVersions).Methods("GET")
|
||||
api.HandleFunc("/rollout", ms.startRollout).Methods("POST", "OPTIONS")
|
||||
|
||||
// Task endpoints
|
||||
api.HandleFunc("/tasks/status", ms.getTaskStatus).Methods("GET")
|
||||
|
||||
// Node endpoints
|
||||
api.HandleFunc("/node/status", ms.getNodeStatus).Methods("GET")
|
||||
api.HandleFunc("/node/status/{ip}", ms.getNodeStatusByIP).Methods("GET")
|
||||
api.HandleFunc("/node/endpoints", ms.getNodeEndpoints).Methods("GET")
|
||||
api.HandleFunc("/node/update", ms.updateNodeFirmware).Methods("POST", "OPTIONS")
|
||||
|
||||
// Proxy endpoints
|
||||
api.HandleFunc("/proxy-call", ms.proxyCall).Methods("POST", "OPTIONS")
|
||||
|
||||
// Registry proxy endpoints
|
||||
api.HandleFunc("/registry/health", ms.getRegistryHealth).Methods("GET")
|
||||
api.HandleFunc("/registry/firmware", ms.listRegistryFirmware).Methods("GET")
|
||||
api.HandleFunc("/registry/firmware", ms.uploadRegistryFirmware).Methods("POST", "OPTIONS")
|
||||
api.HandleFunc("/registry/firmware/{name}/{version}", ms.downloadRegistryFirmware).Methods("GET")
|
||||
api.HandleFunc("/registry/firmware/{name}/{version}", ms.updateRegistryFirmware).Methods("PUT", "OPTIONS")
|
||||
api.HandleFunc("/registry/firmware/{name}/{version}", ms.deleteRegistryFirmware).Methods("DELETE", "OPTIONS")
|
||||
|
||||
// Monitoring endpoints
|
||||
api.HandleFunc("/monitoring/resources", ms.getMonitoringResources).Methods("GET")
|
||||
|
||||
// Test endpoints
|
||||
api.HandleFunc("/test/websocket", ms.testWebSocket).Methods("POST", "OPTIONS")
|
||||
|
||||
// Health check
|
||||
api.HandleFunc("/health", ms.healthCheck).Methods("GET")
|
||||
|
||||
// WebSocket endpoint
|
||||
if ms.enableWS {
|
||||
ms.router.HandleFunc("/ws", ms.corsMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := ms.wsServer.HandleWebSocket(w, r); err != nil {
|
||||
log.WithError(err).Error("WebSocket connection failed")
|
||||
http.Error(w, "WebSocket upgrade failed", http.StatusBadRequest)
|
||||
}
|
||||
})).ServeHTTP)
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the HTTP server
|
||||
func (ms *MockHTTPServer) Start() error {
|
||||
log.WithField("port", ms.port).Info("Starting mock HTTP server")
|
||||
return ms.server.ListenAndServe()
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the HTTP server
|
||||
func (ms *MockHTTPServer) Shutdown(ctx context.Context) error {
|
||||
log.Info("Shutting down mock HTTP server")
|
||||
|
||||
// Shutdown WebSocket server if enabled
|
||||
if ms.enableWS && ms.wsServer != nil {
|
||||
if err := ms.wsServer.Shutdown(ctx); err != nil {
|
||||
log.WithError(err).Error("WebSocket server shutdown error")
|
||||
}
|
||||
}
|
||||
|
||||
return ms.server.Shutdown(ctx)
|
||||
}
|
||||
|
||||
// API endpoint handlers
|
||||
|
||||
// GET /api/discovery/nodes
|
||||
func (ms *MockHTTPServer) getDiscoveryNodes(w http.ResponseWriter, r *http.Request) {
|
||||
nodes := ms.discovery.GetNodes()
|
||||
primaryNode := ms.discovery.GetPrimaryNode()
|
||||
clusterStatus := ms.discovery.GetClusterStatus()
|
||||
|
||||
type NodeResponse struct {
|
||||
*discovery.NodeInfo
|
||||
IsPrimary bool `json:"isPrimary"`
|
||||
}
|
||||
|
||||
response := struct {
|
||||
PrimaryNode string `json:"primaryNode"`
|
||||
TotalNodes int `json:"totalNodes"`
|
||||
Nodes []NodeResponse `json:"nodes"`
|
||||
ClientInitialized bool `json:"clientInitialized"`
|
||||
ClientBaseURL string `json:"clientBaseUrl"`
|
||||
ClusterStatus discovery.ClusterStatus `json:"clusterStatus"`
|
||||
}{
|
||||
PrimaryNode: primaryNode,
|
||||
TotalNodes: len(nodes),
|
||||
Nodes: make([]NodeResponse, 0, len(nodes)),
|
||||
ClientInitialized: primaryNode != "",
|
||||
ClientBaseURL: fmt.Sprintf("http://%s", primaryNode),
|
||||
ClusterStatus: clusterStatus,
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
nodeResponse := NodeResponse{
|
||||
NodeInfo: node,
|
||||
IsPrimary: node.IP == primaryNode,
|
||||
}
|
||||
response.Nodes = append(response.Nodes, nodeResponse)
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// POST /api/discovery/refresh
|
||||
func (ms *MockHTTPServer) refreshDiscovery(w http.ResponseWriter, r *http.Request) {
|
||||
response := struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
PrimaryNode string `json:"primaryNode"`
|
||||
TotalNodes int `json:"totalNodes"`
|
||||
ClientInitialized bool `json:"clientInitialized"`
|
||||
}{
|
||||
Success: true,
|
||||
Message: "Mock cluster refresh completed",
|
||||
PrimaryNode: ms.discovery.GetPrimaryNode(),
|
||||
TotalNodes: len(ms.discovery.GetNodes()),
|
||||
ClientInitialized: ms.discovery.GetPrimaryNode() != "",
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// POST /api/discovery/random-primary
|
||||
func (ms *MockHTTPServer) selectRandomPrimary(w http.ResponseWriter, r *http.Request) {
|
||||
nodes := ms.discovery.GetNodes()
|
||||
if len(nodes) == 0 {
|
||||
http.Error(w, `{"error": "No nodes available"}`, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
newPrimary := ms.discovery.SelectRandomPrimaryNode()
|
||||
if newPrimary == "" {
|
||||
http.Error(w, `{"error": "Selection failed"}`, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
response := struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
PrimaryNode string `json:"primaryNode"`
|
||||
TotalNodes int `json:"totalNodes"`
|
||||
ClientInitialized bool `json:"clientInitialized"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}{
|
||||
Success: true,
|
||||
Message: fmt.Sprintf("Randomly selected new primary node: %s", newPrimary),
|
||||
PrimaryNode: newPrimary,
|
||||
TotalNodes: len(nodes),
|
||||
ClientInitialized: true,
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// POST /api/discovery/primary/{ip}
|
||||
func (ms *MockHTTPServer) setPrimaryNode(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
requestedIP := vars["ip"]
|
||||
|
||||
if err := ms.discovery.SetPrimaryNode(requestedIP); err != nil {
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Node not found", "message": "Node with IP %s not found"}`, requestedIP), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
response := struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
PrimaryNode string `json:"primaryNode"`
|
||||
ClientInitialized bool `json:"clientInitialized"`
|
||||
}{
|
||||
Success: true,
|
||||
Message: fmt.Sprintf("Primary node set to %s", requestedIP),
|
||||
PrimaryNode: requestedIP,
|
||||
ClientInitialized: true,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/cluster/members
|
||||
func (ms *MockHTTPServer) getClusterMembers(w http.ResponseWriter, r *http.Request) {
|
||||
nodes := ms.discovery.GetNodes()
|
||||
|
||||
// Convert to mock format
|
||||
mockNodes := make(map[string]*NodeInfo)
|
||||
for ip, node := range nodes {
|
||||
mockNodes[ip] = &NodeInfo{
|
||||
IP: node.IP,
|
||||
Hostname: node.Hostname,
|
||||
Status: string(node.Status),
|
||||
Latency: node.Latency,
|
||||
LastSeen: node.LastSeen,
|
||||
Labels: node.Labels,
|
||||
}
|
||||
}
|
||||
|
||||
members := GenerateMockClusterMembers(mockNodes)
|
||||
|
||||
response := &client.ClusterStatusResponse{
|
||||
Members: members,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// POST /api/cluster/refresh
|
||||
func (ms *MockHTTPServer) refreshCluster(w http.ResponseWriter, r *http.Request) {
|
||||
var requestBody struct {
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&requestBody); err != nil && err.Error() != "EOF" {
|
||||
requestBody.Reason = "manual_refresh"
|
||||
}
|
||||
|
||||
response := struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
Reason string `json:"reason"`
|
||||
WSclients int `json:"wsClients"`
|
||||
}{
|
||||
Success: true,
|
||||
Message: "Mock cluster refresh triggered",
|
||||
Reason: requestBody.Reason,
|
||||
}
|
||||
|
||||
if ms.enableWS && ms.wsServer != nil {
|
||||
response.WSclients = ms.wsServer.GetClientCount()
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/cluster/node/versions
|
||||
func (ms *MockHTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Request) {
|
||||
nodes := ms.discovery.GetNodes()
|
||||
|
||||
type NodeVersionInfo struct {
|
||||
IP string `json:"ip"`
|
||||
Version string `json:"version"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
}
|
||||
|
||||
var nodeVersions []NodeVersionInfo
|
||||
for _, node := range nodes {
|
||||
version := "unknown"
|
||||
if v, exists := node.Labels["version"]; exists {
|
||||
version = v
|
||||
}
|
||||
|
||||
nodeVersions = append(nodeVersions, NodeVersionInfo{
|
||||
IP: node.IP,
|
||||
Version: version,
|
||||
Labels: node.Labels,
|
||||
})
|
||||
}
|
||||
|
||||
response := struct {
|
||||
Nodes []NodeVersionInfo `json:"nodes"`
|
||||
}{
|
||||
Nodes: nodeVersions,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// RolloutNode represents a node in a rollout request
|
||||
type RolloutNode struct {
|
||||
IP string `json:"ip"`
|
||||
Version string `json:"version"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
}
|
||||
|
||||
// POST /api/rollout
|
||||
func (ms *MockHTTPServer) startRollout(w http.ResponseWriter, r *http.Request) {
|
||||
var request struct {
|
||||
Firmware struct {
|
||||
Name string `json:"name"`
|
||||
Version string `json:"version"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
} `json:"firmware"`
|
||||
Nodes []RolloutNode `json:"nodes"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
|
||||
http.Error(w, `{"error": "Invalid JSON"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
rolloutID := fmt.Sprintf("rollout_%d", time.Now().Unix())
|
||||
|
||||
response := struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
RolloutID string `json:"rolloutId"`
|
||||
TotalNodes int `json:"totalNodes"`
|
||||
FirmwareURL string `json:"firmwareUrl"`
|
||||
}{
|
||||
Success: true,
|
||||
Message: fmt.Sprintf("Mock rollout started for %d nodes", len(request.Nodes)),
|
||||
RolloutID: rolloutID,
|
||||
TotalNodes: len(request.Nodes),
|
||||
FirmwareURL: fmt.Sprintf("http://localhost:3002/firmware/%s/%s", request.Firmware.Name, request.Firmware.Version),
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
|
||||
// Simulate rollout progress in background
|
||||
if ms.enableWS && ms.wsServer != nil {
|
||||
go ms.simulateRollout(rolloutID, request.Nodes, request.Firmware.Version)
|
||||
}
|
||||
}
|
||||
|
||||
// simulateRollout simulates a rollout process with progress updates
|
||||
func (ms *MockHTTPServer) simulateRollout(rolloutID string, nodes []RolloutNode, newVersion string) {
|
||||
for i, node := range nodes {
|
||||
// Simulate updating labels
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
if ms.wsServer != nil {
|
||||
ms.wsServer.BroadcastRolloutProgress(rolloutID, node.IP, "updating_labels", i+1, len(nodes))
|
||||
}
|
||||
|
||||
// Simulate uploading
|
||||
time.Sleep(1 * time.Second)
|
||||
if ms.wsServer != nil {
|
||||
ms.wsServer.BroadcastRolloutProgress(rolloutID, node.IP, "uploading", i+1, len(nodes))
|
||||
}
|
||||
|
||||
// Update node version in discovery
|
||||
ms.discovery.UpdateNodeVersion(node.IP, strings.TrimPrefix(newVersion, "v"))
|
||||
|
||||
// Simulate completion
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
if ms.wsServer != nil {
|
||||
ms.wsServer.BroadcastRolloutProgress(rolloutID, node.IP, "completed", i+1, len(nodes))
|
||||
}
|
||||
}
|
||||
|
||||
log.WithField("rollout_id", rolloutID).Info("Mock rollout completed")
|
||||
}
|
||||
|
||||
// GET /api/tasks/status
|
||||
func (ms *MockHTTPServer) getTaskStatus(w http.ResponseWriter, r *http.Request) {
|
||||
response := GenerateMockTaskStatus()
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/node/status
|
||||
func (ms *MockHTTPServer) getNodeStatus(w http.ResponseWriter, r *http.Request) {
|
||||
primaryNode := ms.discovery.GetPrimaryNode()
|
||||
nodes := ms.discovery.GetNodes()
|
||||
|
||||
var labels map[string]string
|
||||
if primaryNode != "" && nodes[primaryNode] != nil {
|
||||
labels = nodes[primaryNode].Labels
|
||||
} else {
|
||||
labels = make(map[string]string)
|
||||
}
|
||||
|
||||
response := GenerateMockSystemStatus(labels)
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/node/status/{ip}
|
||||
func (ms *MockHTTPServer) getNodeStatusByIP(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
nodeIP := vars["ip"]
|
||||
|
||||
nodes := ms.discovery.GetNodes()
|
||||
node, exists := nodes[nodeIP]
|
||||
if !exists {
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Node not found", "message": "Node with IP %s not found"}`, nodeIP), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
response := GenerateMockSystemStatus(node.Labels)
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/node/endpoints
|
||||
func (ms *MockHTTPServer) getNodeEndpoints(w http.ResponseWriter, r *http.Request) {
|
||||
response := GenerateMockCapabilities()
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// POST /api/node/update
|
||||
func (ms *MockHTTPServer) updateNodeFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
nodeIP := r.URL.Query().Get("ip")
|
||||
if nodeIP == "" {
|
||||
nodeIP = r.Header.Get("X-Node-IP")
|
||||
}
|
||||
|
||||
if nodeIP == "" {
|
||||
http.Error(w, `{"error": "Node IP required"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse multipart form
|
||||
err := r.ParseMultipartForm(50 << 20) // 50MB limit
|
||||
if err != nil {
|
||||
http.Error(w, `{"error": "Failed to parse form"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
file, fileHeader, err := r.FormFile("file")
|
||||
if err != nil {
|
||||
http.Error(w, `{"error": "No file received"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
filename := fileHeader.Filename
|
||||
if filename == "" {
|
||||
filename = "firmware.bin"
|
||||
}
|
||||
|
||||
// Read file data
|
||||
fileData, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
http.Error(w, `{"error": "Failed to read file"}`, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_ip": nodeIP,
|
||||
"file_size": len(fileData),
|
||||
"filename": filename,
|
||||
}).Info("Mock firmware upload received")
|
||||
|
||||
// Store firmware in memory
|
||||
ms.firmwareStore[nodeIP] = fileData
|
||||
|
||||
// Broadcast status if WebSocket enabled
|
||||
if ms.enableWS && ms.wsServer != nil {
|
||||
ms.wsServer.BroadcastFirmwareUploadStatus(nodeIP, "uploading", filename, len(fileData))
|
||||
}
|
||||
|
||||
// Send immediate response
|
||||
response := struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
NodeIP string `json:"nodeIp"`
|
||||
FileSize int `json:"fileSize"`
|
||||
Filename string `json:"filename"`
|
||||
Status string `json:"status"`
|
||||
}{
|
||||
Success: true,
|
||||
Message: "Mock firmware upload received",
|
||||
NodeIP: nodeIP,
|
||||
FileSize: len(fileData),
|
||||
Filename: filename,
|
||||
Status: "processing",
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
|
||||
// Simulate firmware update in background
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second)
|
||||
if ms.enableWS && ms.wsServer != nil {
|
||||
// Randomly succeed or fail (90% success rate)
|
||||
if rand.Float32() < 0.9 {
|
||||
ms.wsServer.BroadcastFirmwareUploadStatus(nodeIP, "completed", filename, len(fileData))
|
||||
} else {
|
||||
ms.wsServer.BroadcastFirmwareUploadStatus(nodeIP, "failed", filename, len(fileData))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// POST /api/proxy-call
|
||||
func (ms *MockHTTPServer) proxyCall(w http.ResponseWriter, r *http.Request) {
|
||||
var requestBody struct {
|
||||
IP string `json:"ip"`
|
||||
Method string `json:"method"`
|
||||
URI string `json:"uri"`
|
||||
Params []map[string]interface{} `json:"params"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&requestBody); err != nil {
|
||||
http.Error(w, `{"error": "Invalid JSON"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Generate mock response based on URI
|
||||
mockResponse := GenerateMockProxyResponse(requestBody.Method, requestBody.URI)
|
||||
|
||||
// Wrap in data field for consistency
|
||||
response := map[string]interface{}{
|
||||
"data": mockResponse,
|
||||
"status": http.StatusOK,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/registry/health
|
||||
func (ms *MockHTTPServer) getRegistryHealth(w http.ResponseWriter, r *http.Request) {
|
||||
response := map[string]interface{}{
|
||||
"status": "healthy",
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
"service": "mock-registry",
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/registry/firmware
|
||||
func (ms *MockHTTPServer) listRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
firmwareList := GenerateMockFirmwareList()
|
||||
json.NewEncoder(w).Encode(firmwareList)
|
||||
}
|
||||
|
||||
// POST /api/registry/firmware
|
||||
func (ms *MockHTTPServer) uploadRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
response := map[string]interface{}{
|
||||
"success": true,
|
||||
"message": "Mock firmware uploaded successfully",
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/registry/firmware/{name}/{version}
|
||||
func (ms *MockHTTPServer) downloadRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
name := vars["name"]
|
||||
version := vars["version"]
|
||||
|
||||
// Generate mock firmware binary
|
||||
firmwareData := GenerateMockFirmwareBinary(524288) // 512KB
|
||||
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s-%s.bin\"", name, version))
|
||||
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(firmwareData)))
|
||||
|
||||
w.Write(firmwareData)
|
||||
}
|
||||
|
||||
// PUT /api/registry/firmware/{name}/{version}
|
||||
func (ms *MockHTTPServer) updateRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
response := map[string]interface{}{
|
||||
"success": true,
|
||||
"message": "Mock firmware metadata updated",
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// DELETE /api/registry/firmware/{name}/{version}
|
||||
func (ms *MockHTTPServer) deleteRegistryFirmware(w http.ResponseWriter, r *http.Request) {
|
||||
response := map[string]interface{}{
|
||||
"success": true,
|
||||
"message": "Mock firmware deleted",
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// POST /api/test/websocket
|
||||
func (ms *MockHTTPServer) testWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
response := struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
WSclients int `json:"websocketClients"`
|
||||
TotalNodes int `json:"totalNodes"`
|
||||
}{
|
||||
Success: true,
|
||||
Message: "Mock WebSocket test broadcast sent",
|
||||
TotalNodes: len(ms.discovery.GetNodes()),
|
||||
}
|
||||
|
||||
if ms.enableWS && ms.wsServer != nil {
|
||||
response.WSclients = ms.wsServer.GetClientCount()
|
||||
// Trigger a test broadcast
|
||||
ms.wsServer.BroadcastClusterUpdate()
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/monitoring/resources
|
||||
func (ms *MockHTTPServer) getMonitoringResources(w http.ResponseWriter, r *http.Request) {
|
||||
nodes := ms.discovery.GetNodes()
|
||||
|
||||
// Convert to mock format
|
||||
mockNodes := make(map[string]*NodeInfo)
|
||||
for ip, node := range nodes {
|
||||
mockNodes[ip] = &NodeInfo{
|
||||
IP: node.IP,
|
||||
Hostname: node.Hostname,
|
||||
Status: string(node.Status),
|
||||
Latency: node.Latency,
|
||||
LastSeen: node.LastSeen,
|
||||
Labels: node.Labels,
|
||||
}
|
||||
}
|
||||
|
||||
response := GenerateMockMonitoringResources(mockNodes)
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GET /api/health
|
||||
func (ms *MockHTTPServer) healthCheck(w http.ResponseWriter, r *http.Request) {
|
||||
nodes := ms.discovery.GetNodes()
|
||||
primaryNode := ms.discovery.GetPrimaryNode()
|
||||
|
||||
health := struct {
|
||||
Status string `json:"status"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Services map[string]bool `json:"services"`
|
||||
Cluster map[string]interface{} `json:"cluster"`
|
||||
Mock bool `json:"mock"`
|
||||
}{
|
||||
Status: "healthy",
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
Services: map[string]bool{
|
||||
"http": true,
|
||||
"websocket": ms.enableWS,
|
||||
"discovery": true,
|
||||
"mockClient": primaryNode != "",
|
||||
},
|
||||
Cluster: map[string]interface{}{
|
||||
"totalNodes": len(nodes),
|
||||
"primaryNode": primaryNode,
|
||||
},
|
||||
Mock: true,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(health)
|
||||
}
|
||||
@@ -1,475 +0,0 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true // Allow connections from any origin
|
||||
},
|
||||
}
|
||||
|
||||
// MockWebSocketServer manages WebSocket connections and mock broadcasts
|
||||
type MockWebSocketServer struct {
|
||||
discovery *MockNodeDiscovery
|
||||
clients map[*websocket.Conn]bool
|
||||
mutex sync.RWMutex
|
||||
writeMutex sync.Mutex
|
||||
shutdownChan chan struct{}
|
||||
shutdownOnce sync.Once
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// NewMockWebSocketServer creates a new mock WebSocket server
|
||||
func NewMockWebSocketServer(discovery *MockNodeDiscovery) *MockWebSocketServer {
|
||||
mws := &MockWebSocketServer{
|
||||
discovery: discovery,
|
||||
clients: make(map[*websocket.Conn]bool),
|
||||
shutdownChan: make(chan struct{}),
|
||||
logger: log.New(),
|
||||
}
|
||||
|
||||
// Register callback for node updates
|
||||
discovery.AddCallback(mws.handleNodeUpdate)
|
||||
|
||||
// Start periodic broadcasts
|
||||
go mws.startPeriodicBroadcasts()
|
||||
|
||||
return mws
|
||||
}
|
||||
|
||||
// HandleWebSocket handles WebSocket upgrade and connection
|
||||
func (mws *MockWebSocketServer) HandleWebSocket(w http.ResponseWriter, r *http.Request) error {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to upgrade WebSocket connection")
|
||||
return err
|
||||
}
|
||||
|
||||
mws.mutex.Lock()
|
||||
mws.clients[conn] = true
|
||||
mws.mutex.Unlock()
|
||||
|
||||
mws.logger.Debug("Mock WebSocket client connected")
|
||||
|
||||
// Send current cluster state to newly connected client
|
||||
go mws.sendCurrentClusterState(conn)
|
||||
|
||||
// Handle client messages and disconnection
|
||||
go mws.handleClient(conn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleClient handles messages from a WebSocket client
|
||||
func (mws *MockWebSocketServer) handleClient(conn *websocket.Conn) {
|
||||
defer func() {
|
||||
mws.mutex.Lock()
|
||||
delete(mws.clients, conn)
|
||||
mws.mutex.Unlock()
|
||||
conn.Close()
|
||||
mws.logger.Debug("Mock WebSocket client disconnected")
|
||||
}()
|
||||
|
||||
// Set read deadline and pong handler
|
||||
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
conn.SetPongHandler(func(string) error {
|
||||
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
|
||||
// Start ping routine
|
||||
go func() {
|
||||
ticker := time.NewTicker(54 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-mws.shutdownChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Read messages
|
||||
for {
|
||||
_, _, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
mws.logger.WithError(err).Error("WebSocket error")
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendCurrentClusterState sends the current cluster state to a newly connected client
|
||||
func (mws *MockWebSocketServer) sendCurrentClusterState(conn *websocket.Conn) {
|
||||
nodes := mws.discovery.GetNodes()
|
||||
if len(nodes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Convert to mock format
|
||||
mockNodes := make(map[string]*NodeInfo)
|
||||
for ip, node := range nodes {
|
||||
mockNodes[ip] = &NodeInfo{
|
||||
IP: node.IP,
|
||||
Hostname: node.Hostname,
|
||||
Status: string(node.Status),
|
||||
Latency: node.Latency,
|
||||
LastSeen: node.LastSeen,
|
||||
Labels: node.Labels,
|
||||
}
|
||||
}
|
||||
|
||||
members := GenerateMockClusterMembers(mockNodes)
|
||||
|
||||
message := struct {
|
||||
Type string `json:"type"`
|
||||
Members interface{} `json:"members"`
|
||||
PrimaryNode string `json:"primaryNode"`
|
||||
TotalNodes int `json:"totalNodes"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}{
|
||||
Type: "cluster_update",
|
||||
Members: members,
|
||||
PrimaryNode: mws.discovery.GetPrimaryNode(),
|
||||
TotalNodes: len(nodes),
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to marshal cluster data")
|
||||
return
|
||||
}
|
||||
|
||||
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to send initial cluster state")
|
||||
}
|
||||
}
|
||||
|
||||
// handleNodeUpdate is called when node information changes
|
||||
func (mws *MockWebSocketServer) handleNodeUpdate(nodeIP, action string) {
|
||||
mws.logger.WithFields(log.Fields{
|
||||
"node_ip": nodeIP,
|
||||
"action": action,
|
||||
}).Debug("Mock node update received, broadcasting to WebSocket clients")
|
||||
|
||||
// Broadcast cluster update
|
||||
mws.BroadcastClusterUpdate()
|
||||
|
||||
// Also broadcast node discovery event
|
||||
mws.broadcastNodeDiscovery(nodeIP, action)
|
||||
}
|
||||
|
||||
// startPeriodicBroadcasts sends periodic updates to keep clients informed
|
||||
func (mws *MockWebSocketServer) startPeriodicBroadcasts() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-mws.shutdownChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
mws.BroadcastClusterUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastClusterUpdate sends cluster updates to all connected clients
|
||||
func (mws *MockWebSocketServer) BroadcastClusterUpdate() {
|
||||
mws.mutex.RLock()
|
||||
clients := make([]*websocket.Conn, 0, len(mws.clients))
|
||||
for client := range mws.clients {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
mws.mutex.RUnlock()
|
||||
|
||||
if len(clients) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
nodes := mws.discovery.GetNodes()
|
||||
|
||||
// Convert to mock format
|
||||
mockNodes := make(map[string]*NodeInfo)
|
||||
for ip, node := range nodes {
|
||||
mockNodes[ip] = &NodeInfo{
|
||||
IP: node.IP,
|
||||
Hostname: node.Hostname,
|
||||
Status: string(node.Status),
|
||||
Latency: node.Latency,
|
||||
LastSeen: node.LastSeen,
|
||||
Labels: node.Labels,
|
||||
}
|
||||
}
|
||||
|
||||
members := GenerateMockClusterMembers(mockNodes)
|
||||
|
||||
message := struct {
|
||||
Type string `json:"type"`
|
||||
Members interface{} `json:"members"`
|
||||
PrimaryNode string `json:"primaryNode"`
|
||||
TotalNodes int `json:"totalNodes"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}{
|
||||
Type: "cluster_update",
|
||||
Members: members,
|
||||
PrimaryNode: mws.discovery.GetPrimaryNode(),
|
||||
TotalNodes: len(nodes),
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to marshal cluster update")
|
||||
return
|
||||
}
|
||||
|
||||
mws.logger.WithField("clients", len(clients)).Debug("Broadcasting mock cluster update")
|
||||
|
||||
// Send to all clients with write synchronization
|
||||
mws.writeMutex.Lock()
|
||||
defer mws.writeMutex.Unlock()
|
||||
|
||||
for _, client := range clients {
|
||||
client.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||
if err := client.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to send cluster update to client")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// broadcastNodeDiscovery sends node discovery events to all clients
|
||||
func (mws *MockWebSocketServer) broadcastNodeDiscovery(nodeIP, action string) {
|
||||
mws.mutex.RLock()
|
||||
clients := make([]*websocket.Conn, 0, len(mws.clients))
|
||||
for client := range mws.clients {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
mws.mutex.RUnlock()
|
||||
|
||||
if len(clients) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
message := struct {
|
||||
Type string `json:"type"`
|
||||
Action string `json:"action"`
|
||||
NodeIP string `json:"nodeIp"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}{
|
||||
Type: "node_discovery",
|
||||
Action: action,
|
||||
NodeIP: nodeIP,
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to marshal node discovery event")
|
||||
return
|
||||
}
|
||||
|
||||
// Send to all clients with write synchronization
|
||||
mws.writeMutex.Lock()
|
||||
defer mws.writeMutex.Unlock()
|
||||
|
||||
for _, client := range clients {
|
||||
client.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||
if err := client.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to send node discovery event to client")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastFirmwareUploadStatus sends firmware upload status updates to all clients
|
||||
func (mws *MockWebSocketServer) BroadcastFirmwareUploadStatus(nodeIP, status, filename string, fileSize int) {
|
||||
mws.mutex.RLock()
|
||||
clients := make([]*websocket.Conn, 0, len(mws.clients))
|
||||
for client := range mws.clients {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
mws.mutex.RUnlock()
|
||||
|
||||
if len(clients) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
message := struct {
|
||||
Type string `json:"type"`
|
||||
NodeIP string `json:"nodeIp"`
|
||||
Status string `json:"status"`
|
||||
Filename string `json:"filename"`
|
||||
FileSize int `json:"fileSize"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}{
|
||||
Type: "firmware_upload_status",
|
||||
NodeIP: nodeIP,
|
||||
Status: status,
|
||||
Filename: filename,
|
||||
FileSize: fileSize,
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to marshal firmware upload status")
|
||||
return
|
||||
}
|
||||
|
||||
mws.logger.WithFields(log.Fields{
|
||||
"node_ip": nodeIP,
|
||||
"status": status,
|
||||
"clients": len(clients),
|
||||
}).Debug("Broadcasting mock firmware upload status")
|
||||
|
||||
// Send to all clients with write synchronization
|
||||
mws.writeMutex.Lock()
|
||||
defer mws.writeMutex.Unlock()
|
||||
|
||||
for _, client := range clients {
|
||||
client.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||
if err := client.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to send firmware upload status to client")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastRolloutProgress sends rollout progress updates to all clients
|
||||
func (mws *MockWebSocketServer) BroadcastRolloutProgress(rolloutID, nodeIP, status string, current, total int) {
|
||||
mws.mutex.RLock()
|
||||
clients := make([]*websocket.Conn, 0, len(mws.clients))
|
||||
for client := range mws.clients {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
mws.mutex.RUnlock()
|
||||
|
||||
if len(clients) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
message := struct {
|
||||
Type string `json:"type"`
|
||||
RolloutID string `json:"rolloutId"`
|
||||
NodeIP string `json:"nodeIp"`
|
||||
Status string `json:"status"`
|
||||
Current int `json:"current"`
|
||||
Total int `json:"total"`
|
||||
Progress int `json:"progress"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}{
|
||||
Type: "rollout_progress",
|
||||
RolloutID: rolloutID,
|
||||
NodeIP: nodeIP,
|
||||
Status: status,
|
||||
Current: current,
|
||||
Total: total,
|
||||
Progress: calculateProgress(current, total, status),
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to marshal rollout progress")
|
||||
return
|
||||
}
|
||||
|
||||
mws.logger.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": nodeIP,
|
||||
"status": status,
|
||||
"progress": fmt.Sprintf("%d/%d", current, total),
|
||||
}).Debug("Broadcasting mock rollout progress")
|
||||
|
||||
// Send to all clients with write synchronization
|
||||
mws.writeMutex.Lock()
|
||||
defer mws.writeMutex.Unlock()
|
||||
|
||||
for _, client := range clients {
|
||||
client.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||
if err := client.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||
mws.logger.WithError(err).Error("Failed to send rollout progress to client")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calculateProgress calculates the correct progress percentage based on current status
|
||||
func calculateProgress(current, total int, status string) int {
|
||||
if total == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Base progress is based on completed nodes
|
||||
completedNodes := current - 1
|
||||
if status == "completed" {
|
||||
completedNodes = current
|
||||
}
|
||||
|
||||
// Calculate base progress (completed nodes / total nodes)
|
||||
baseProgress := float64(completedNodes) / float64(total) * 100
|
||||
|
||||
// If currently updating labels or uploading, add partial progress for the current node
|
||||
if status == "updating_labels" {
|
||||
nodeProgress := 100.0 / float64(total) * 0.25
|
||||
baseProgress += nodeProgress
|
||||
} else if status == "uploading" {
|
||||
nodeProgress := 100.0 / float64(total) * 0.5
|
||||
baseProgress += nodeProgress
|
||||
}
|
||||
|
||||
// Ensure we don't exceed 100%
|
||||
if baseProgress > 100 {
|
||||
baseProgress = 100
|
||||
}
|
||||
|
||||
return int(baseProgress)
|
||||
}
|
||||
|
||||
// GetClientCount returns the number of connected WebSocket clients
|
||||
func (mws *MockWebSocketServer) GetClientCount() int {
|
||||
mws.mutex.RLock()
|
||||
defer mws.mutex.RUnlock()
|
||||
return len(mws.clients)
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the WebSocket server
|
||||
func (mws *MockWebSocketServer) Shutdown(ctx context.Context) error {
|
||||
mws.shutdownOnce.Do(func() {
|
||||
mws.logger.Info("Shutting down mock WebSocket server")
|
||||
close(mws.shutdownChan)
|
||||
|
||||
mws.mutex.Lock()
|
||||
clients := make([]*websocket.Conn, 0, len(mws.clients))
|
||||
for client := range mws.clients {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
mws.mutex.Unlock()
|
||||
|
||||
// Close all client connections
|
||||
for _, client := range clients {
|
||||
client.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, "Server shutting down"))
|
||||
client.Close()
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -366,16 +366,19 @@ func (hs *HTTPServer) setPrimaryNode(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// GET /api/cluster/members
|
||||
func (hs *HTTPServer) getClusterMembers(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("Fetching cluster members via API")
|
||||
|
||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||
return client.GetClusterStatus()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching cluster members")
|
||||
log.WithError(err).Debug("Failed to fetch cluster members")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Successfully fetched cluster members via API")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
@@ -417,42 +420,52 @@ func (hs *HTTPServer) getTaskStatus(w http.ResponseWriter, r *http.Request) {
|
||||
ip := r.URL.Query().Get("ip")
|
||||
|
||||
if ip != "" {
|
||||
log.WithField("node_ip", ip).Debug("Fetching task status from specific node")
|
||||
client := hs.getSporeClient(ip)
|
||||
result, err := client.GetTaskStatus()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching task status from specific node")
|
||||
log.WithFields(log.Fields{
|
||||
"node_ip": ip,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to fetch task status from specific node")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
log.WithField("node_ip", ip).Debug("Successfully fetched task status from specific node")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Fetching task status via failover")
|
||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||
return client.GetTaskStatus()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching task status")
|
||||
log.WithError(err).Debug("Failed to fetch task status via failover")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch task status", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Successfully fetched task status via failover")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
// GET /api/node/status
|
||||
func (hs *HTTPServer) getNodeStatus(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("Fetching node system status via failover")
|
||||
|
||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||
return client.GetSystemStatus()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching system status")
|
||||
log.WithError(err).Debug("Failed to fetch system status via failover")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch system status", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Successfully fetched system status via failover")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
@@ -461,14 +474,20 @@ func (hs *HTTPServer) getNodeStatusByIP(w http.ResponseWriter, r *http.Request)
|
||||
vars := mux.Vars(r)
|
||||
nodeIP := vars["ip"]
|
||||
|
||||
log.WithField("node_ip", nodeIP).Debug("Fetching system status from specific node")
|
||||
|
||||
client := hs.getSporeClient(nodeIP)
|
||||
result, err := client.GetSystemStatus()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching status from specific node")
|
||||
log.WithFields(log.Fields{
|
||||
"node_ip": nodeIP,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to fetch status from specific node")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch status from node %s", "message": "%s"}`, nodeIP, err.Error()), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.WithField("node_ip", nodeIP).Debug("Successfully fetched status from specific node")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
@@ -477,27 +496,34 @@ func (hs *HTTPServer) getNodeEndpoints(w http.ResponseWriter, r *http.Request) {
|
||||
ip := r.URL.Query().Get("ip")
|
||||
|
||||
if ip != "" {
|
||||
log.WithField("node_ip", ip).Debug("Fetching endpoints from specific node")
|
||||
client := hs.getSporeClient(ip)
|
||||
result, err := client.GetCapabilities()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching endpoints from specific node")
|
||||
log.WithFields(log.Fields{
|
||||
"node_ip": ip,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to fetch endpoints from specific node")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch endpoints from node", "message": "%s"}`, err.Error()), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
log.WithField("node_ip", ip).Debug("Successfully fetched endpoints from specific node")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Fetching capabilities via failover")
|
||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||
return client.GetCapabilities()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching capabilities")
|
||||
log.WithError(err).Debug("Failed to fetch capabilities via failover")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch capabilities", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("Successfully fetched capabilities via failover")
|
||||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
@@ -849,18 +875,21 @@ type ClusterNodeVersionsResponse struct {
|
||||
|
||||
// GET /api/cluster/node/versions
|
||||
func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("Fetching cluster node versions")
|
||||
|
||||
result, err := hs.performWithFailover(func(client *client.SporeClient) (interface{}, error) {
|
||||
return client.GetClusterStatus()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error fetching cluster members for versions")
|
||||
log.WithError(err).Debug("Failed to fetch cluster members for versions")
|
||||
http.Error(w, fmt.Sprintf(`{"error": "Failed to fetch cluster members", "message": "%s"}`, err.Error()), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
clusterStatus, ok := result.(*client.ClusterStatusResponse)
|
||||
if !ok {
|
||||
log.Debug("Invalid cluster status response type")
|
||||
http.Error(w, `{"error": "Invalid cluster status response"}`, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
@@ -880,6 +909,8 @@ func (hs *HTTPServer) getClusterNodeVersions(w http.ResponseWriter, r *http.Requ
|
||||
})
|
||||
}
|
||||
|
||||
log.WithField("node_count", len(nodeVersions)).Debug("Successfully fetched cluster node versions")
|
||||
|
||||
response := ClusterNodeVersionsResponse{
|
||||
Nodes: nodeVersions,
|
||||
}
|
||||
@@ -956,12 +987,25 @@ func (hs *HTTPServer) nodeMatchesLabels(nodeLabels, rolloutLabels map[string]str
|
||||
|
||||
// processRollout handles the actual rollout process in the background
|
||||
func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwareInfo FirmwareInfo) {
|
||||
log.WithField("rollout_id", rolloutID).Info("Starting background rollout process")
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||
"node_count": len(nodes),
|
||||
}).Debug("Starting background rollout process")
|
||||
|
||||
// Download firmware from registry
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||
}).Debug("Downloading firmware from registry for rollout")
|
||||
|
||||
firmwareData, err := hs.registryClient.DownloadFirmware(firmwareInfo.Name, firmwareInfo.Version)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to download firmware for rollout")
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||
"error": err.Error(),
|
||||
}).Error("Failed to download firmware for rollout")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -970,7 +1014,7 @@ func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwar
|
||||
"firmware": fmt.Sprintf("%s/%s", firmwareInfo.Name, firmwareInfo.Version),
|
||||
"size": len(firmwareData),
|
||||
"total_nodes": len(nodes),
|
||||
}).Info("Downloaded firmware for rollout")
|
||||
}).Debug("Successfully downloaded firmware for rollout")
|
||||
|
||||
// Process nodes in parallel using goroutines
|
||||
var wg sync.WaitGroup
|
||||
@@ -984,9 +1028,14 @@ func (hs *HTTPServer) processRollout(rolloutID string, nodes []NodeInfo, firmwar
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
"progress": fmt.Sprintf("%d/%d", nodeIndex+1, len(nodes)),
|
||||
}).Info("Processing node in rollout")
|
||||
}).Debug("Processing node in rollout")
|
||||
|
||||
// Update version label on the node before upload
|
||||
log.WithFields(log.Fields{
|
||||
"rollout_id": rolloutID,
|
||||
"node_ip": node.IP,
|
||||
}).Debug("Getting SPORE client for node")
|
||||
|
||||
client := hs.getSporeClient(node.IP)
|
||||
|
||||
// Create updated labels with the new version
|
||||
|
||||
@@ -29,6 +29,9 @@ type WebSocketServer struct {
|
||||
mutex sync.RWMutex
|
||||
writeMutex sync.Mutex // Mutex to serialize writes to WebSocket connections
|
||||
logger *log.Logger
|
||||
clusterInfoTicker *time.Ticker
|
||||
clusterInfoStopCh chan bool
|
||||
clusterInfoInterval time.Duration
|
||||
}
|
||||
|
||||
// NewWebSocketServer creates a new WebSocket server
|
||||
@@ -38,11 +41,16 @@ func NewWebSocketServer(nodeDiscovery *discovery.NodeDiscovery) *WebSocketServer
|
||||
sporeClients: make(map[string]*client.SporeClient),
|
||||
clients: make(map[*websocket.Conn]bool),
|
||||
logger: log.New(),
|
||||
clusterInfoStopCh: make(chan bool),
|
||||
clusterInfoInterval: 5 * time.Second, // Fetch cluster info every 5 seconds
|
||||
}
|
||||
|
||||
// Register callback for node updates
|
||||
nodeDiscovery.AddCallback(wss.handleNodeUpdate)
|
||||
|
||||
// Start periodic cluster info fetching
|
||||
go wss.startPeriodicClusterInfoFetching()
|
||||
|
||||
return wss
|
||||
}
|
||||
|
||||
@@ -151,17 +159,48 @@ func (wss *WebSocketServer) sendCurrentClusterState(conn *websocket.Conn) {
|
||||
}
|
||||
}
|
||||
|
||||
// startPeriodicClusterInfoFetching starts a goroutine that periodically fetches cluster info
|
||||
func (wss *WebSocketServer) startPeriodicClusterInfoFetching() {
|
||||
wss.clusterInfoTicker = time.NewTicker(wss.clusterInfoInterval)
|
||||
defer wss.clusterInfoTicker.Stop()
|
||||
|
||||
wss.logger.WithField("interval", wss.clusterInfoInterval).Info("Starting periodic cluster info fetching")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-wss.clusterInfoTicker.C:
|
||||
wss.fetchAndBroadcastClusterInfo()
|
||||
case <-wss.clusterInfoStopCh:
|
||||
wss.logger.Info("Stopping periodic cluster info fetching")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fetchAndBroadcastClusterInfo fetches cluster info and broadcasts it to clients
|
||||
func (wss *WebSocketServer) fetchAndBroadcastClusterInfo() {
|
||||
// Only fetch if we have clients connected
|
||||
wss.mutex.RLock()
|
||||
clientCount := len(wss.clients)
|
||||
wss.mutex.RUnlock()
|
||||
|
||||
if clientCount == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
wss.logger.Debug("Periodically fetching cluster info")
|
||||
wss.broadcastClusterUpdate()
|
||||
}
|
||||
|
||||
// handleNodeUpdate is called when node information changes
|
||||
func (wss *WebSocketServer) handleNodeUpdate(nodeIP, action string) {
|
||||
wss.logger.WithFields(log.Fields{
|
||||
"node_ip": nodeIP,
|
||||
"action": action,
|
||||
}).Debug("Node update received, broadcasting to WebSocket clients")
|
||||
}).Debug("Node update received, broadcasting node discovery event")
|
||||
|
||||
// Broadcast cluster update to all clients
|
||||
wss.broadcastClusterUpdate()
|
||||
|
||||
// Also broadcast node discovery event
|
||||
// Only broadcast node discovery event, not cluster update
|
||||
// Cluster updates are now handled by periodic fetching
|
||||
wss.broadcastNodeDiscovery(nodeIP, action)
|
||||
}
|
||||
|
||||
@@ -429,20 +468,38 @@ func (wss *WebSocketServer) calculateProgress(current, total int, status string)
|
||||
func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember, error) {
|
||||
nodes := wss.nodeDiscovery.GetNodes()
|
||||
if len(nodes) == 0 {
|
||||
wss.logger.Debug("No nodes available for cluster member retrieval")
|
||||
return []client.ClusterMember{}, nil
|
||||
}
|
||||
|
||||
// Try to get real cluster data from primary node
|
||||
primaryNode := wss.nodeDiscovery.GetPrimaryNode()
|
||||
if primaryNode != "" {
|
||||
wss.logger.WithFields(log.Fields{
|
||||
"primary_node": primaryNode,
|
||||
"total_nodes": len(nodes),
|
||||
}).Debug("Fetching cluster members from primary node")
|
||||
|
||||
client := wss.getSporeClient(primaryNode)
|
||||
clusterStatus, err := client.GetClusterStatus()
|
||||
if err == nil {
|
||||
// Update local node data with API information
|
||||
wss.logger.WithFields(log.Fields{
|
||||
"primary_node": primaryNode,
|
||||
"member_count": len(clusterStatus.Members),
|
||||
}).Debug("Successfully fetched cluster members from primary node")
|
||||
|
||||
// Update local node data with API information but preserve heartbeat status
|
||||
wss.updateLocalNodesWithAPI(clusterStatus.Members)
|
||||
return clusterStatus.Members, nil
|
||||
|
||||
// Return merged data with heartbeat-based status override
|
||||
return wss.mergeAPIWithHeartbeatStatus(clusterStatus.Members), nil
|
||||
}
|
||||
wss.logger.WithError(err).Error("Failed to get cluster status from primary node")
|
||||
wss.logger.WithFields(log.Fields{
|
||||
"primary_node": primaryNode,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to get cluster status from primary node, using fallback")
|
||||
} else {
|
||||
wss.logger.Debug("No primary node available, using fallback cluster members")
|
||||
}
|
||||
|
||||
// Fallback to local data if API fails
|
||||
@@ -451,18 +508,60 @@ func (wss *WebSocketServer) getCurrentClusterMembers() ([]client.ClusterMember,
|
||||
|
||||
// updateLocalNodesWithAPI updates local node data with information from API
|
||||
func (wss *WebSocketServer) updateLocalNodesWithAPI(apiMembers []client.ClusterMember) {
|
||||
// This would update the local node discovery with fresh API data
|
||||
// For now, we'll just log that we received the data
|
||||
wss.logger.WithField("members", len(apiMembers)).Debug("Updating local nodes with API data")
|
||||
|
||||
for _, member := range apiMembers {
|
||||
if len(member.Labels) > 0 {
|
||||
// Update local node with API data, but preserve heartbeat-based status
|
||||
wss.updateNodeWithAPIData(member)
|
||||
}
|
||||
}
|
||||
|
||||
// updateNodeWithAPIData updates a single node with API data while preserving heartbeat status
|
||||
func (wss *WebSocketServer) updateNodeWithAPIData(apiMember client.ClusterMember) {
|
||||
nodes := wss.nodeDiscovery.GetNodes()
|
||||
if localNode, exists := nodes[apiMember.IP]; exists {
|
||||
// Update additional data from API but preserve heartbeat-based status
|
||||
localNode.Labels = apiMember.Labels
|
||||
localNode.Resources = apiMember.Resources
|
||||
localNode.Latency = apiMember.Latency
|
||||
|
||||
// Only update hostname if it's different and not empty
|
||||
if apiMember.Hostname != "" && apiMember.Hostname != localNode.Hostname {
|
||||
localNode.Hostname = apiMember.Hostname
|
||||
}
|
||||
|
||||
wss.logger.WithFields(log.Fields{
|
||||
"ip": member.IP,
|
||||
"labels": member.Labels,
|
||||
}).Debug("API member labels")
|
||||
"ip": apiMember.IP,
|
||||
"labels": apiMember.Labels,
|
||||
"status": localNode.Status, // Keep heartbeat-based status
|
||||
}).Debug("Updated node with API data, preserved heartbeat status")
|
||||
}
|
||||
}
|
||||
|
||||
// mergeAPIWithHeartbeatStatus merges API member data with heartbeat-based status
|
||||
func (wss *WebSocketServer) mergeAPIWithHeartbeatStatus(apiMembers []client.ClusterMember) []client.ClusterMember {
|
||||
localNodes := wss.nodeDiscovery.GetNodes()
|
||||
mergedMembers := make([]client.ClusterMember, 0, len(apiMembers))
|
||||
|
||||
for _, apiMember := range apiMembers {
|
||||
mergedMember := apiMember
|
||||
|
||||
// Override status with heartbeat-based status if we have local data
|
||||
if localNode, exists := localNodes[apiMember.IP]; exists {
|
||||
mergedMember.Status = string(localNode.Status)
|
||||
mergedMember.LastSeen = localNode.LastSeen.Unix()
|
||||
|
||||
wss.logger.WithFields(log.Fields{
|
||||
"ip": apiMember.IP,
|
||||
"api_status": apiMember.Status,
|
||||
"heartbeat_status": localNode.Status,
|
||||
}).Debug("Overriding API status with heartbeat status")
|
||||
}
|
||||
|
||||
mergedMembers = append(mergedMembers, mergedMember)
|
||||
}
|
||||
|
||||
return mergedMembers
|
||||
}
|
||||
|
||||
// getFallbackClusterMembers returns local node data as fallback
|
||||
@@ -507,6 +606,9 @@ func (wss *WebSocketServer) GetClientCount() int {
|
||||
func (wss *WebSocketServer) Shutdown(ctx context.Context) error {
|
||||
wss.logger.Info("Shutting down WebSocket server")
|
||||
|
||||
// Stop periodic cluster info fetching
|
||||
close(wss.clusterInfoStopCh)
|
||||
|
||||
wss.mutex.Lock()
|
||||
clients := make([]*websocket.Conn, 0, len(wss.clients))
|
||||
for client := range wss.clients {
|
||||
|
||||
@@ -117,21 +117,43 @@ type FirmwareUpdateResponse struct {
|
||||
func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) {
|
||||
url := fmt.Sprintf("%s/api/cluster/members", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"endpoint": "/api/cluster/members",
|
||||
}).Debug("Fetching cluster status from SPORE node")
|
||||
|
||||
resp, err := c.HTTPClient.Get(url)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to fetch cluster status from SPORE node")
|
||||
return nil, fmt.Errorf("failed to get cluster status: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"status_code": resp.StatusCode,
|
||||
}).Debug("Cluster status request returned non-OK status")
|
||||
return nil, fmt.Errorf("cluster status request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var clusterStatus ClusterStatusResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&clusterStatus); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode cluster status response")
|
||||
return nil, fmt.Errorf("failed to decode cluster status response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"member_count": len(clusterStatus.Members),
|
||||
}).Debug("Successfully fetched cluster status from SPORE node")
|
||||
|
||||
return &clusterStatus, nil
|
||||
}
|
||||
|
||||
@@ -139,21 +161,44 @@ func (c *SporeClient) GetClusterStatus() (*ClusterStatusResponse, error) {
|
||||
func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) {
|
||||
url := fmt.Sprintf("%s/api/tasks/status", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"endpoint": "/api/tasks/status",
|
||||
}).Debug("Fetching task status from SPORE node")
|
||||
|
||||
resp, err := c.HTTPClient.Get(url)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to fetch task status from SPORE node")
|
||||
return nil, fmt.Errorf("failed to get task status: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"status_code": resp.StatusCode,
|
||||
}).Debug("Task status request returned non-OK status")
|
||||
return nil, fmt.Errorf("task status request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var taskStatus TaskStatusResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&taskStatus); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode task status response")
|
||||
return nil, fmt.Errorf("failed to decode task status response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"total_tasks": taskStatus.Summary.TotalTasks,
|
||||
"active_tasks": taskStatus.Summary.ActiveTasks,
|
||||
}).Debug("Successfully fetched task status from SPORE node")
|
||||
|
||||
return &taskStatus, nil
|
||||
}
|
||||
|
||||
@@ -161,21 +206,44 @@ func (c *SporeClient) GetTaskStatus() (*TaskStatusResponse, error) {
|
||||
func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) {
|
||||
url := fmt.Sprintf("%s/api/node/status", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"endpoint": "/api/node/status",
|
||||
}).Debug("Fetching system status from SPORE node")
|
||||
|
||||
resp, err := c.HTTPClient.Get(url)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to fetch system status from SPORE node")
|
||||
return nil, fmt.Errorf("failed to get system status: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"status_code": resp.StatusCode,
|
||||
}).Debug("System status request returned non-OK status")
|
||||
return nil, fmt.Errorf("system status request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var systemStatus SystemStatusResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&systemStatus); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode system status response")
|
||||
return nil, fmt.Errorf("failed to decode system status response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"free_heap": systemStatus.FreeHeap,
|
||||
"chip_id": systemStatus.ChipID,
|
||||
}).Debug("Successfully fetched system status from SPORE node")
|
||||
|
||||
return &systemStatus, nil
|
||||
}
|
||||
|
||||
@@ -183,21 +251,43 @@ func (c *SporeClient) GetSystemStatus() (*SystemStatusResponse, error) {
|
||||
func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) {
|
||||
url := fmt.Sprintf("%s/api/node/endpoints", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"endpoint": "/api/node/endpoints",
|
||||
}).Debug("Fetching capabilities from SPORE node")
|
||||
|
||||
resp, err := c.HTTPClient.Get(url)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to fetch capabilities from SPORE node")
|
||||
return nil, fmt.Errorf("failed to get capabilities: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"status_code": resp.StatusCode,
|
||||
}).Debug("Capabilities request returned non-OK status")
|
||||
return nil, fmt.Errorf("capabilities request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var capabilities CapabilitiesResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&capabilities); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode capabilities response")
|
||||
return nil, fmt.Errorf("failed to decode capabilities response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"endpoint_count": len(capabilities.Endpoints),
|
||||
}).Debug("Successfully fetched capabilities from SPORE node")
|
||||
|
||||
return &capabilities, nil
|
||||
}
|
||||
|
||||
@@ -205,16 +295,30 @@ func (c *SporeClient) GetCapabilities() (*CapabilitiesResponse, error) {
|
||||
func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*FirmwareUpdateResponse, error) {
|
||||
url := fmt.Sprintf("%s/api/node/update", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"endpoint": "/api/node/update",
|
||||
"filename": filename,
|
||||
"data_size": len(firmwareData),
|
||||
}).Debug("Preparing firmware upload to SPORE node")
|
||||
|
||||
// Create multipart form
|
||||
var requestBody bytes.Buffer
|
||||
contentType := createMultipartForm(&requestBody, firmwareData, filename)
|
||||
|
||||
if contentType == "" {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
}).Debug("Failed to create multipart form for firmware upload")
|
||||
return nil, fmt.Errorf("failed to create multipart form")
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", url, &requestBody)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to create firmware update request")
|
||||
return nil, fmt.Errorf("failed to create firmware update request: %w", err)
|
||||
}
|
||||
|
||||
@@ -226,9 +330,10 @@ func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*Fir
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_ip": c.BaseURL,
|
||||
"status": "sending_firmware",
|
||||
}).Debug("Sending firmware to SPORE device")
|
||||
"node_url": c.BaseURL,
|
||||
"filename": filename,
|
||||
"data_size": len(firmwareData),
|
||||
}).Debug("Uploading firmware to SPORE node")
|
||||
|
||||
resp, err := firmwareClient.Do(req)
|
||||
if err != nil {
|
||||
@@ -277,9 +382,19 @@ func (c *SporeClient) UpdateFirmware(firmwareData []byte, filename string) (*Fir
|
||||
func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
|
||||
targetURL := fmt.Sprintf("%s/api/node/config", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"endpoint": "/api/node/config",
|
||||
"labels": labels,
|
||||
}).Debug("Updating node labels on SPORE node")
|
||||
|
||||
// Convert labels to JSON
|
||||
labelsJSON, err := json.Marshal(labels)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to marshal labels")
|
||||
return fmt.Errorf("failed to marshal labels: %w", err)
|
||||
}
|
||||
|
||||
@@ -289,6 +404,10 @@ func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
|
||||
|
||||
req, err := http.NewRequest("POST", targetURL, strings.NewReader(data.Encode()))
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to create labels update request")
|
||||
return fmt.Errorf("failed to create labels update request: %w", err)
|
||||
}
|
||||
|
||||
@@ -296,19 +415,28 @@ func (c *SporeClient) UpdateNodeLabels(labels map[string]string) error {
|
||||
|
||||
resp, err := c.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to update node labels")
|
||||
return fmt.Errorf("failed to update node labels: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"status_code": resp.StatusCode,
|
||||
"error_body": string(body),
|
||||
}).Debug("Node labels update returned non-OK status")
|
||||
return fmt.Errorf("node labels update failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_ip": c.BaseURL,
|
||||
"node_url": c.BaseURL,
|
||||
"labels": labels,
|
||||
}).Info("Node labels updated successfully")
|
||||
}).Debug("Successfully updated node labels on SPORE node")
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -318,17 +446,43 @@ func (c *SporeClient) ProxyCall(method, uri string, params map[string]interface{
|
||||
// Build target URL
|
||||
targetURL := fmt.Sprintf("%s%s", c.BaseURL, uri)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"method": method,
|
||||
"endpoint": uri,
|
||||
"param_count": len(params),
|
||||
}).Debug("Making proxy call to SPORE node")
|
||||
|
||||
// Parse parameters and build request
|
||||
req, err := c.buildProxyRequest(method, targetURL, params)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"method": method,
|
||||
"endpoint": uri,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to build proxy request")
|
||||
return nil, fmt.Errorf("failed to build proxy request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"method": method,
|
||||
"endpoint": uri,
|
||||
"error": err.Error(),
|
||||
}).Debug("Proxy call failed")
|
||||
return nil, fmt.Errorf("proxy call failed: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"node_url": c.BaseURL,
|
||||
"method": method,
|
||||
"endpoint": uri,
|
||||
"status_code": resp.StatusCode,
|
||||
}).Debug("Proxy call completed successfully")
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -69,21 +69,42 @@ func (c *RegistryClient) FindFirmwareByNameAndVersion(name, version string) (*Fi
|
||||
func (c *RegistryClient) GetHealth() (map[string]interface{}, error) {
|
||||
url := fmt.Sprintf("%s/health", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"endpoint": "/health",
|
||||
}).Debug("Checking registry health")
|
||||
|
||||
resp, err := c.HTTPClient.Get(url)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to check registry health")
|
||||
return nil, fmt.Errorf("failed to get registry health: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"status_code": resp.StatusCode,
|
||||
}).Debug("Registry health check returned non-OK status")
|
||||
return nil, fmt.Errorf("registry health check failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var health map[string]interface{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode health response")
|
||||
return nil, fmt.Errorf("failed to decode health response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
}).Debug("Successfully checked registry health")
|
||||
|
||||
return health, nil
|
||||
}
|
||||
|
||||
@@ -91,6 +112,13 @@ func (c *RegistryClient) GetHealth() (map[string]interface{}, error) {
|
||||
func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile io.Reader) (map[string]interface{}, error) {
|
||||
url := fmt.Sprintf("%s/firmware", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"endpoint": "/firmware",
|
||||
"name": metadata.Name,
|
||||
"version": metadata.Version,
|
||||
}).Debug("Uploading firmware to registry")
|
||||
|
||||
// Create multipart form data
|
||||
body := &bytes.Buffer{}
|
||||
writer := multipart.NewWriter(body)
|
||||
@@ -98,11 +126,19 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
||||
// Add metadata
|
||||
metadataJSON, err := json.Marshal(metadata)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to marshal firmware metadata")
|
||||
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
|
||||
}
|
||||
|
||||
metadataPart, err := writer.CreateFormField("metadata")
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to create metadata field")
|
||||
return nil, fmt.Errorf("failed to create metadata field: %w", err)
|
||||
}
|
||||
metadataPart.Write(metadataJSON)
|
||||
@@ -110,10 +146,18 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
||||
// Add firmware file
|
||||
firmwarePart, err := writer.CreateFormFile("firmware", fmt.Sprintf("%s-%s.bin", metadata.Name, metadata.Version))
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to create firmware field")
|
||||
return nil, fmt.Errorf("failed to create firmware field: %w", err)
|
||||
}
|
||||
|
||||
if _, err := io.Copy(firmwarePart, firmwareFile); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to copy firmware data")
|
||||
return nil, fmt.Errorf("failed to copy firmware data: %w", err)
|
||||
}
|
||||
|
||||
@@ -121,6 +165,10 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
||||
|
||||
req, err := http.NewRequest("POST", url, body)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to create upload request")
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
@@ -128,20 +176,43 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
||||
|
||||
resp, err := c.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": metadata.Name,
|
||||
"version": metadata.Version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to upload firmware to registry")
|
||||
return nil, fmt.Errorf("failed to upload firmware: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": metadata.Name,
|
||||
"version": metadata.Version,
|
||||
"status_code": resp.StatusCode,
|
||||
"error_body": string(body),
|
||||
}).Debug("Firmware upload returned non-OK status")
|
||||
return nil, fmt.Errorf("firmware upload failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode upload response")
|
||||
return nil, fmt.Errorf("failed to decode upload response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": metadata.Name,
|
||||
"version": metadata.Version,
|
||||
}).Debug("Successfully uploaded firmware to registry")
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -149,13 +220,32 @@ func (c *RegistryClient) UploadFirmware(metadata FirmwareMetadata, firmwareFile
|
||||
func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata FirmwareMetadata) (map[string]interface{}, error) {
|
||||
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
|
||||
"name": name,
|
||||
"version": version,
|
||||
}).Debug("Updating firmware metadata in registry")
|
||||
|
||||
metadataJSON, err := json.Marshal(metadata)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to marshal metadata")
|
||||
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(metadataJSON))
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to create update request")
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
@@ -163,20 +253,45 @@ func (c *RegistryClient) UpdateFirmwareMetadata(name, version string, metadata F
|
||||
|
||||
resp, err := c.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to update firmware metadata in registry")
|
||||
return nil, fmt.Errorf("failed to update firmware metadata: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"status_code": resp.StatusCode,
|
||||
"error_body": string(body),
|
||||
}).Debug("Firmware metadata update returned non-OK status")
|
||||
return nil, fmt.Errorf("firmware metadata update failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode update response")
|
||||
return nil, fmt.Errorf("failed to decode update response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
}).Debug("Successfully updated firmware metadata in registry")
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -221,21 +336,43 @@ func (c *RegistryClient) firmwareMatchesLabels(firmwareLabels, rolloutLabels map
|
||||
func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) {
|
||||
url := fmt.Sprintf("%s/firmware", c.BaseURL)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"endpoint": "/firmware",
|
||||
}).Debug("Fetching firmware list from registry")
|
||||
|
||||
resp, err := c.HTTPClient.Get(url)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to fetch firmware list from registry")
|
||||
return nil, fmt.Errorf("failed to get firmware list: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"status_code": resp.StatusCode,
|
||||
}).Debug("Firmware list request returned non-OK status")
|
||||
return nil, fmt.Errorf("firmware list request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var firmwareList []GroupedFirmware
|
||||
if err := json.NewDecoder(resp.Body).Decode(&firmwareList); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode firmware list response")
|
||||
return nil, fmt.Errorf("failed to decode firmware list response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"firmware_count": len(firmwareList),
|
||||
}).Debug("Successfully fetched firmware list from registry")
|
||||
|
||||
return firmwareList, nil
|
||||
}
|
||||
|
||||
@@ -243,26 +380,52 @@ func (c *RegistryClient) ListFirmware() ([]GroupedFirmware, error) {
|
||||
func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error) {
|
||||
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
|
||||
"name": name,
|
||||
"version": version,
|
||||
}).Debug("Downloading firmware from registry")
|
||||
|
||||
resp, err := c.HTTPClient.Get(url)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to download firmware from registry")
|
||||
return nil, fmt.Errorf("failed to download firmware: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"status_code": resp.StatusCode,
|
||||
}).Debug("Firmware download request returned non-OK status")
|
||||
return nil, fmt.Errorf("firmware download request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to read firmware data from registry")
|
||||
return nil, fmt.Errorf("failed to read firmware data: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"size": len(data),
|
||||
}).Info("Downloaded firmware from registry")
|
||||
}).Debug("Successfully downloaded firmware from registry")
|
||||
|
||||
return data, nil
|
||||
}
|
||||
@@ -271,31 +434,64 @@ func (c *RegistryClient) DownloadFirmware(name, version string) ([]byte, error)
|
||||
func (c *RegistryClient) DeleteFirmware(name, version string) (map[string]interface{}, error) {
|
||||
url := fmt.Sprintf("%s/firmware/%s/%s", c.BaseURL, name, version)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"endpoint": fmt.Sprintf("/firmware/%s/%s", name, version),
|
||||
"name": name,
|
||||
"version": version,
|
||||
}).Debug("Deleting firmware from registry")
|
||||
|
||||
req, err := http.NewRequest(http.MethodDelete, url, nil)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to create delete request")
|
||||
return nil, fmt.Errorf("failed to create delete request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to delete firmware from registry")
|
||||
return nil, fmt.Errorf("failed to delete firmware: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"status_code": resp.StatusCode,
|
||||
"error_body": string(body),
|
||||
}).Debug("Firmware delete returned non-OK status")
|
||||
return nil, fmt.Errorf("firmware delete request failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
"error": err.Error(),
|
||||
}).Debug("Failed to decode delete response")
|
||||
return nil, fmt.Errorf("failed to decode delete response: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"registry_url": c.BaseURL,
|
||||
"name": name,
|
||||
"version": version,
|
||||
}).Info("Deleted firmware from registry")
|
||||
}).Debug("Successfully deleted firmware from registry")
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -1,103 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# SPORE Mock Gateway Runner
|
||||
# Builds and runs the mock gateway with common configurations
|
||||
|
||||
set -e
|
||||
|
||||
# Colors for output
|
||||
GREEN='\033[0;32m'
|
||||
BLUE='\033[0;34m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# Default values
|
||||
PORT="${PORT:-3001}"
|
||||
MOCK_NODES="${MOCK_NODES:-5}"
|
||||
HEARTBEAT_RATE="${HEARTBEAT_RATE:-5}"
|
||||
LOG_LEVEL="${LOG_LEVEL:-info}"
|
||||
ENABLE_WS="${ENABLE_WS:-true}"
|
||||
|
||||
# Parse command line arguments
|
||||
while [[ $# -gt 0 ]]; do
|
||||
case $1 in
|
||||
-p|--port)
|
||||
PORT="$2"
|
||||
shift 2
|
||||
;;
|
||||
-n|--nodes)
|
||||
MOCK_NODES="$2"
|
||||
shift 2
|
||||
;;
|
||||
-h|--heartbeat)
|
||||
HEARTBEAT_RATE="$2"
|
||||
shift 2
|
||||
;;
|
||||
-l|--log-level)
|
||||
LOG_LEVEL="$2"
|
||||
shift 2
|
||||
;;
|
||||
--no-ws)
|
||||
ENABLE_WS="false"
|
||||
shift
|
||||
;;
|
||||
--help)
|
||||
echo "SPORE Mock Gateway Runner"
|
||||
echo ""
|
||||
echo "Usage: $0 [options]"
|
||||
echo ""
|
||||
echo "Options:"
|
||||
echo " -p, --port PORT HTTP server port (default: 3001)"
|
||||
echo " -n, --nodes NUM Number of mock nodes (default: 5)"
|
||||
echo " -h, --heartbeat SECONDS Heartbeat interval (default: 5)"
|
||||
echo " -l, --log-level LEVEL Log level: debug, info, warn, error (default: info)"
|
||||
echo " --no-ws Disable WebSocket support"
|
||||
echo " --help Show this help message"
|
||||
echo ""
|
||||
echo "Examples:"
|
||||
echo " $0 -p 8080 -n 10 # Run on port 8080 with 10 nodes"
|
||||
echo " $0 -l debug -h 2 # Debug logging with 2s heartbeats"
|
||||
echo " $0 --no-ws # Run without WebSocket"
|
||||
echo ""
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
echo "Unknown option: $1"
|
||||
echo "Use --help for usage information"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
echo -e "${BLUE}================================================${NC}"
|
||||
echo -e "${BLUE} SPORE Mock Gateway${NC}"
|
||||
echo -e "${BLUE}================================================${NC}"
|
||||
echo ""
|
||||
echo -e "${GREEN}Configuration:${NC}"
|
||||
echo -e " Port: ${YELLOW}$PORT${NC}"
|
||||
echo -e " Mock Nodes: ${YELLOW}$MOCK_NODES${NC}"
|
||||
echo -e " Heartbeat Rate: ${YELLOW}${HEARTBEAT_RATE}s${NC}"
|
||||
echo -e " Log Level: ${YELLOW}$LOG_LEVEL${NC}"
|
||||
echo -e " WebSocket: ${YELLOW}$ENABLE_WS${NC}"
|
||||
echo ""
|
||||
|
||||
# Check if Go is installed
|
||||
if ! command -v go &> /dev/null; then
|
||||
echo -e "${YELLOW}Error: Go is not installed${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Build the mock gateway
|
||||
echo -e "${GREEN}Building mock gateway...${NC}"
|
||||
go build -o mock-gateway cmd/mock-gateway/main.go
|
||||
|
||||
echo -e "${GREEN}Starting mock gateway...${NC}"
|
||||
echo ""
|
||||
|
||||
# Run the mock gateway
|
||||
./mock-gateway \
|
||||
-port "$PORT" \
|
||||
-mock-nodes "$MOCK_NODES" \
|
||||
-heartbeat-rate "$HEARTBEAT_RATE" \
|
||||
-log-level "$LOG_LEVEL" \
|
||||
-enable-ws="$ENABLE_WS"
|
||||
Reference in New Issue
Block a user