Merge pull request 'feat: MQTT integration' (#4) from feature/mqtt-relay into main

Reviewed-on: #4
This commit is contained in:
2025-10-26 18:59:51 +01:00
13 changed files with 998 additions and 3 deletions

View File

@@ -28,10 +28,42 @@ Options:
HTTP server port (default "3001")
-udp-port string
UDP discovery port (default "4210")
-mqtt string
Enable MQTT integration with server URL (e.g., tcp://localhost:1883)
-log-level string
Log level (debug, info, warn, error) (default "info")
```
### MQTT Integration
The gateway can integrate with an MQTT broker to subscribe to all MQTT topics and forward messages to connected WebSocket clients.
To enable MQTT integration:
```bash
# Basic usage
./spore-gateway -mqtt tcp://localhost:1883
# With authentication (using environment variables)
MQTT_USER=username MQTT_PASSWORD=password ./spore-gateway -mqtt tcp://broker.example.com:1883
```
When enabled, the gateway will:
- Connect to the specified MQTT broker
- Subscribe to all topics (`#`)
- Forward all received messages to connected WebSocket clients with the format:
```json
{
"topic": "sensor/temperature",
"data": "{\"value\": 23.5}",
"timestamp": "2024-01-15T10:30:00Z"
}
```
Environment variables:
- `MQTT_USER`: Username for MQTT broker authentication (optional)
- `MQTT_PASSWORD`: Password for MQTT broker authentication (optional)
## Integration
The spore-gateway works together with the SPORE UI frontend:
@@ -94,11 +126,19 @@ The application follows the same patterns as the original Node.js spore-ui serve
- HTTP middleware for CORS and logging
- WebSocket support for real-time updates
## Documentation
See the `docs/` directory for detailed documentation:
- [MQTT Integration](./docs/MQTT.md) - MQTT message forwarding and integration
- [Rollout Process](./docs/Rollout.md) - Firmware rollout orchestration
- [Testing Tools](./hack/README.md) - Local MQTT broker and testing scripts
## Architecture
- `main.go` - Application entry point
- `internal/discovery/` - UDP-based node discovery
- `internal/server/` - HTTP API server
- `internal/websocket/` - WebSocket server for real-time updates
- `internal/mqtt/` - MQTT client and message forwarding
- `pkg/client/` - SPORE API client
- `pkg/config/` - Configuration management

370
docs/MQTT.md Normal file
View File

@@ -0,0 +1,370 @@
# MQTT Integration
The SPORE Gateway includes optional MQTT integration that allows subscribing to MQTT brokers and forwarding messages to connected WebSocket clients. This enables integration with IoT devices, sensor networks, and other MQTT-based systems.
## Overview
When enabled, the gateway acts as an MQTT subscriber that:
- Connects to an MQTT broker
- Subscribes to all topics (`#`)
- Forwards received messages to WebSocket clients in real-time
This allows the SPORE UI to display MQTT events alongside SPORE cluster events.
## Features
- **Universal Topic Subscription**: Subscribes to `#` (all topics) to capture all messages
- **WebSocket Forwarding**: All MQTT messages are forwarded to connected WebSocket clients
- **Authentication Support**: Optional username/password authentication
- **Automatic Reconnection**: Handles connection failures and automatically reconnects
- **Structured Message Format**: Messages are formatted with topic, data, and timestamp
## Usage
### Basic Usage
Start the gateway with MQTT integration enabled:
```bash
./spore-gateway -mqtt tcp://localhost:1883
```
### With Authentication
If your MQTT broker requires authentication, use environment variables:
```bash
MQTT_USER=username MQTT_PASSWORD=password ./spore-gateway -mqtt tcp://broker.example.com:1883
```
### Complete Example
```bash
# Terminal 1: Start MQTT broker (optional, for testing)
cd hack
./mosquitto.sh
# Terminal 2: Start SPORE gateway with MQTT integration
cd ..
MQTT_USER=admin MQTT_PASSWORD=secret ./spore-gateway -mqtt tcp://localhost:1883
# Terminal 3: Publish test messages
cd hack
./mqtt-test.sh
```
## Message Format
MQTT messages received by the gateway are forwarded to WebSocket clients with the following JSON structure:
```json
{
"topic": "sensor/temperature/living-room",
"data": "{\"temperature\": 23.5, \"unit\": \"celsius\", \"timestamp\": \"2024-01-15T10:30:00Z\"}",
"timestamp": "2024-01-15T10:30:00Z"
}
```
### Fields
- **topic** (string): The MQTT topic the message was published to
- **data** (string): The raw message payload as a string (can be JSON, text, binary data encoded as string, etc.)
- **timestamp** (string): RFC3339 timestamp when the gateway received the message
### Message Payload Handling
The gateway treats all MQTT message payloads as raw data (byte arrays). When forwarding to WebSocket:
- Binary data is converted to string representation
- Text data is forwarded as-is
- JSON data remains as JSON string (not parsed)
This preserves the original message format while allowing the WebSocket client to parse or display it as needed.
## Configuration
### Command Line Flags
| Flag | Description | Example |
|------|-------------|---------|
| `-mqtt` | MQTT broker URL | `tcp://localhost:1883` |
### Environment Variables
| Variable | Description | Required |
|----------|-------------|----------|
| `MQTT_USER` | Username for MQTT authentication | No |
| `MQTT_PASSWORD` | Password for MQTT authentication | No |
### Broker URLs
Supported URL formats:
- `tcp://hostname:port` - Standard MQTT (e.g., `tcp://localhost:1883`)
- `tcp://hostname` - Uses default port 1883
- `tls://hostname:8883` - Secure MQTT with TLS
Note: TLS support may require additional configuration in the MQTT client.
## Architecture
### Components
1. **MQTT Client** (`internal/mqtt/mqtt.go`)
- Manages connection to MQTT broker
- Handles subscriptions and message reception
- Implements reconnection logic
2. **WebSocket Server** (`internal/websocket/websocket.go`)
- Broadcasting MQTT messages to connected clients
- Serialization and message formatting
3. **Main Application** (`main.go`)
- Coordinates MQTT client initialization
- Sets up message callback for WebSocket forwarding
### Data Flow
```
MQTT Broker → MQTT Client → Callback → HTTP Server → WebSocket Server → Client
```
1. MQTT broker publishes message to any topic
2. Gateway's MQTT client receives message
3. Message callback triggers
4. HTTP server broadcasts to WebSocket
5. WebSocket server forwards to all connected clients
## Testing
### Local Testing Setup
The `hack/` directory contains scripts for testing MQTT integration:
```bash
# Start a local MQTT broker
./hack/mosquitto.sh
# Run comprehensive test suite
./hack/mqtt-test.sh
```
### Test Messages
The test suite includes 16 different message types:
- Simple text messages
- JSON sensor data (temperature, humidity)
- Device status updates
- System events and alerts
- Configuration updates
- Metrics and telemetry
- Node discovery events
- Firmware updates
- Task status
- Error logs
- Light control (SPORE-specific)
- Binary data
- Edge cases (empty messages, large payloads)
### Manual Testing
You can also publish messages manually using the Mosquitto client:
```bash
# Install mosquitto clients
# Ubuntu/Debian: apt-get install mosquitto-clients
# Or use Docker: docker run --rm -it --network host eclipse-mosquitto:latest mosquitto_pub
# Publish a test message
docker run --rm --network host eclipse-mosquitto:latest \
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello World"
# Publish JSON message
docker run --rm --network host eclipse-mosquitto:latest \
mosquitto_pub -h localhost -p 1883 -t "sensor/data" \
-m '{"sensor": "temperature", "value": 25.5, "unit": "celsius"}'
```
## Integration with SPORE UI
When the SPORE UI connects to the gateway's WebSocket endpoint, it will automatically receive MQTT messages. The UI can handle these messages similarly to SPORE cluster events.
### WebSocket Event Types
The WebSocket receives different event types:
- **Cluster Events**: `cluster/update`, `node/discovery`, etc. (from SPORE nodes)
- **MQTT Events**: Any topic from MQTT (identified by the topic field)
Example WebSocket message from MQTT:
```json
{
"topic": "sensor/temperature",
"data": "23.5",
"timestamp": "2024-01-15T10:30:00Z"
}
```
Example WebSocket message from SPORE cluster:
```json
{
"topic": "cluster/update",
"members": [...],
"primaryNode": "192.168.1.100",
"totalNodes": 3,
"timestamp": "2024-01-15T10:30:00Z"
}
```
## Troubleshooting
### Connection Issues
**Problem**: Gateway fails to connect to MQTT broker
**Solutions**:
```bash
# Check if broker is running
docker ps | grep mqtt-broker
# Check broker logs
docker logs mqtt-broker
# Test connection manually
docker run --rm -it --network host eclipse-mosquitto:latest \
mosquitto_pub -h localhost -p 1883 -t "test" -m "test"
```
### Messages Not Forwarding
**Problem**: MQTT messages not appearing in WebSocket
**Solutions**:
1. Verify gateway is running with MQTT enabled
2. Check gateway logs for MQTT connection status
3. Verify WebSocket client is connected
4. Check MQTT broker logs for subscription confirmation
### Authentication Errors
**Problem**: "Connection refused" or authentication errors
**Solutions**:
```bash
# Ensure environment variables are set
export MQTT_USER=username
export MQTT_PASSWORD=password
# Verify broker allows connections
# Check mosquitto.conf for allow_anonymous or authentication settings
```
### High Message Volume
If receiving many MQTT messages:
- Gateway handles messages efficiently using Go concurrency
- WebSocket broadcasts are serialized to prevent race conditions
- Consider QoS levels if message delivery is critical
## Best Practices
### Topic Naming
Use hierarchical topic names for better organization:
```
sensor/temperature/living-room
sensor/humidity/bedroom
device/status/esp32-001
cluster/node/discovered
```
### Message Size
- Keep individual messages reasonably sized (< 10KB recommended)
- For large data, consider splitting into multiple messages
- Use compression if transmitting large JSON payloads
### Security
- Use authentication for production deployments
- Consider TLS for encrypted connections
- Use topic filtering if subscribing to specific topics only (modify subscription)
- Implement rate limiting on message processing if needed
### Error Handling
The gateway includes automatic reconnection logic:
- Initial connection failures are logged
- Reconnection attempts every 10 seconds
- Connection state is tracked and logged
- WebSocket clients are notified via disconnect events
## Limitations
- **QoS Levels**: Currently uses QoS 0 (at most once delivery)
- **Topic Filtering**: Subscribes to all topics (`#`); no selective subscription
- **Message Retention**: Does not store messages; forwards only real-time events
- **Duplicate Handling**: Does not deduplicate messages
- **Ordering**: Maintains message order within individual WebSocket broadcasts
## Future Enhancements
Potential improvements:
- Configurable QoS levels per topic
- Selective topic subscription via configuration
- Message persistence and replay
- Metrics and monitoring for MQTT integration
- Support for MQTT 5.0 features
## Related Documentation
- [Main README](../README.md) - Overview of SPORE Gateway
- [Rollout Documentation](./Rollout.md) - Firmware rollout process
- [Hack Directory](../hack/README.md) - Testing tools and scripts
## Examples
### Example: IoT Sensor Integration
Connect temperature sensors to the gateway:
```bash
# Start gateway with MQTT
./spore-gateway -mqtt tcp://iot-broker.example.com:1883
# Sensors publish to topics like:
# - sensor/temperature/room1
# - sensor/humidity/room1
# - sensor/light/room1
```
### Example: Device Control
Control SPORE nodes via MQTT:
```bash
# Publish control commands
mosquitto_pub -h broker.example.com -t "spore/control" \
-m '{"node": "esp32-001", "action": "pattern", "pattern": "rainbow"}'
```
### Example: Monitoring Dashboard
Combine SPORE cluster events with external system events:
```bash
# Gateway receives both:
# 1. SPORE cluster events (from UDP discovery)
# 2. External system events (via MQTT)
# UI displays unified event stream
```
## Support
For issues or questions about MQTT integration:
- Check gateway logs for MQTT connection status
- Review MQTT broker configuration
- Use `hack/mqtt-test.sh` for testing
- See [troubleshooting section](#troubleshooting) above

73
docs/README.md Normal file
View File

@@ -0,0 +1,73 @@
# SPORE Gateway Documentation
Welcome to the SPORE Gateway documentation. This directory contains detailed documentation for various features and capabilities of the gateway.
## Available Documentation
### [MQTT Integration](./MQTT.md)
Comprehensive guide to the MQTT integration feature, including:
- Setting up MQTT integration
- Message format and handling
- Testing with local MQTT brokers
- Architecture and data flow
- Troubleshooting and best practices
### [Rollout Process](./Rollout.md)
Detailed documentation for the firmware rollout system:
- Parallel firmware updates across multiple nodes
- WebSocket progress updates
- Integration with spore-registry
- API endpoints and message formats
## Quick Links
- **Main README**: [../README.md](../README.md)
- **Hack Directory**: [../hack/README.md](../hack/README.md)
- **Testing Scripts**: [../hack/](../hack/)
## Feature Overview
### Core Features
- UDP-based node discovery
- Cluster management and primary node selection
- HTTP API server for cluster operations
- WebSocket real-time updates
- Failover logic for automatic primary switching
- Generic proxy calls to SPORE nodes
### Integration Features
- **MQTT Integration**: Subscribe to MQTT topics and forward messages to WebSocket clients
- **Firmware Rollout**: Orchestrated firmware updates across the cluster
- **Registry Proxy**: Proxy for spore-registry firmware management
## Getting Started
1. **Basic Setup**: See [Main README](../README.md) for installation and basic usage
2. **MQTT Integration**: See [MQTT.md](./MQTT.md) for MQTT setup and testing
3. **Testing**: See [Hack README](../hack/README.md) for local testing tools
## Development
The gateway is written in Go and follows modern Go best practices:
- Structured logging using logrus
- Graceful shutdown handling
- Concurrent-safe operations
- HTTP middleware for CORS and logging
- WebSocket support for real-time updates
## Contributing
When adding new features:
1. Update relevant documentation in this directory
2. Add examples to the `hack/` directory
3. Update the main README with feature highlights
4. Follow the existing documentation style and structure
## Support
For questions or issues:
- Check the relevant documentation in this directory
- Review gateway logs for error messages
- Use testing tools in the `hack/` directory
- Check the main README for troubleshooting tips

11
go.mod
View File

@@ -1,11 +1,18 @@
module spore-gateway
go 1.21
go 1.24.0
toolchain go1.24.3
require (
github.com/eclipse/paho.mqtt.golang v1.5.1
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.3
github.com/sirupsen/logrus v1.9.3
)
require golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
require (
golang.org/x/net v0.44.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.36.0 // indirect
)

9
go.sum
View File

@@ -1,6 +1,8 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
@@ -12,8 +14,13 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

130
hack/README.md Normal file
View File

@@ -0,0 +1,130 @@
# Hack Directory
This directory contains utility scripts for testing and development of the SPORE Gateway.
## Scripts
### mosquitto.sh
Starts a local Mosquitto MQTT broker using Docker.
**Usage:**
```bash
./mosquitto.sh
```
This will:
- Start a Mosquitto broker on port 1883
- Use the configuration from `mosquitto.conf`
- Allow anonymous connections (no authentication required)
### mqtt-test.sh
Sends various test events to the local MQTT broker to test the gateway's MQTT integration.
**Usage:**
```bash
# Make sure the broker is running first
./mosquitto.sh # In terminal 1
# In another terminal, run the tests
./mqtt-test.sh
```
This script will send 16 different test messages covering:
## Test Message Coverage
The full `mqtt-test.sh` script will send 16 different test messages covering:
- Simple text messages
- JSON sensor data (temperature, humidity)
- Device status updates
- System events and alerts
- Configuration updates
- Metrics
- Cluster/node discovery events
- Firmware updates
- Task status
- Error logs
- Light control (SPORE nodes)
- Binary data
- Edge cases (empty messages, large payloads)
## Testing MQTT Integration
### Complete Test Workflow
1. **Start the MQTT broker:**
```bash
cd hack
./mosquitto.sh
```
2. **In a new terminal, start the SPORE gateway with MQTT enabled:**
```bash
cd /path/to/spore-gateway
./spore-gateway -mqtt tcp://localhost:1883
```
3. **In another terminal, run the test script:**
```bash
cd hack
./mqtt-test.sh
```
4. **Monitor the WebSocket connection** to see the events being forwarded.
You can use a WebSocket client or the SPORE UI to connect to `ws://localhost:3001/ws`.
### Expected Output
All MQTT messages will be forwarded through the WebSocket with this format:
```json
{
"topic": "sensor/temperature/living-room",
"data": "{\"temperature\": 23.5, \"unit\": \"celsius\", \"timestamp\": \"2024-01-15T10:30:00Z\"}",
"timestamp": "2024-01-15T10:30:00Z"
}
```
## Customization
### Using a Different MQTT Broker
You can change the broker URL using the `MQTT_BROKER` environment variable:
```bash
MQTT_BROKER=tcp://broker.example.com:1883 ./mqtt-test.sh
```
### Adding Your Own Test Messages
Edit `mqtt-test.sh` and add your custom test case:
```bash
# Test N: Your custom test
echo -e "${YELLOW}=== Test N: Your Description ===${NC}"
publish_json "your/topic" '{"your": "data"}'
```
## Troubleshooting
### Broker Not Starting
- Make sure Docker is running
- Check if port 1883 is already in use
- Verify the Mosquitto image is available: `docker pull eclipse-mosquitto:latest`
### Messages Not Being Received
- Verify the gateway is running with `-mqtt tcp://localhost:1883`
- Check the gateway logs for connection errors
- Ensure the WebSocket client is connected to `ws://localhost:3001/ws`
### Port Conflicts
If port 1883 is in use, modify `mosquitto.sh` to use a different port:
```bash
-p 1884:1883 # Maps host port 1884 to container port 1883
```
Then update your gateway command:
```bash
./spore-gateway -mqtt tcp://localhost:1884
```

10
hack/mosquitto.conf Normal file
View File

@@ -0,0 +1,10 @@
# -----------------------------
# Basic Mosquitto configuration
# -----------------------------
listener 1883
allow_anonymous true
# (Optional) WebSocket listener if you exposed port 9001 above
# listener 9001
# protocol websockets
# allow_anonymous true

7
hack/mosquitto.sh Executable file
View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
docker run --rm -it \
--name mqtt-broker \
-p 1883:1883 \
-v $(pwd)/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro \
eclipse-mosquitto:latest

136
hack/mqtt-test.sh Executable file
View File

@@ -0,0 +1,136 @@
#!/usr/bin/env bash
# MQTT Test Script for SPORE Gateway
# This script sends various test events to the local MQTT broker
set -e
# Configuration
MQTT_BROKER="${MQTT_BROKER:-tcp://localhost:1883}"
DOCKER_IMAGE="eclipse-mosquitto:latest"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Function to publish an MQTT message
publish_message() {
local topic="$1"
local payload="$2"
local qos="${3:-0}"
echo -e "${YELLOW}Publishing to topic: ${GREEN}${topic}${NC}"
docker run --rm --network host \
"${DOCKER_IMAGE}" \
mosquitto_pub \
-h localhost \
-p 1883 \
-t "${topic}" \
-m "${payload}" \
-q "${qos}"
if [ $? -eq 0 ]; then
echo -e "${GREEN}✓ Message sent successfully${NC}"
else
echo -e "${RED}✗ Failed to send message${NC}"
exit 1
fi
echo ""
}
# Function to publish a JSON message
publish_json() {
local topic="$1"
local json="$2"
local qos="${3:-0}"
publish_message "${topic}" "${json}" "${qos}"
}
# Main test execution
echo "============================================"
echo " SPORE Gateway MQTT Test Suite"
echo "============================================"
echo ""
echo "Using MQTT broker: ${MQTT_BROKER}"
echo ""
# Test 1: Simple text message
echo -e "${YELLOW}=== Test 1: Simple Text Message ===${NC}"
publish_message "test/hello" "Hello from MQTT test script!"
# Test 2: Temperature sensor reading
echo -e "${YELLOW}=== Test 2: Temperature Sensor Reading ===${NC}"
publish_json "sensor/temperature/living-room" '{"temperature": 23.5, "unit": "celsius", "timestamp": "2024-01-15T10:30:00Z"}'
# Test 3: Humidity sensor reading
echo -e "${YELLOW}=== Test 3: Humidity Sensor Reading ===${NC}"
publish_json "sensor/humidity/bedroom" '{"humidity": 45.2, "unit": "percent", "timestamp": "2024-01-15T10:30:05Z"}'
# Test 4: Device status
echo -e "${YELLOW}=== Test 4: Device Status Update ===${NC}"
publish_json "device/status/esp32-001" '{"id": "esp32-001", "status": "online", "uptime": 3600, "firmware": "v1.2.3"}'
# Test 5: System event
echo -e "${YELLOW}=== Test 5: System Event ===${NC}"
publish_json "system/event" '{"type": "startup", "message": "Gateway started successfully", "timestamp": "2024-01-15T10:30:10Z"}'
# Test 6: Alert message
echo -e "${YELLOW}=== Test 6: Alert Message ===${NC}"
publish_json "alert/high-temperature" '{"level": "warning", "message": "Temperature exceeded threshold", "value": 35.5, "threshold": 30.0}'
# Test 7: Configuration update
echo -e "${YELLOW}=== Test 7: Configuration Update ===${NC}"
publish_json "config/update" '{"section": "network", "key": "retry_count", "value": 3, "updated": "2024-01-15T10:30:15Z"}'
# Test 8: Metric data
echo -e "${YELLOW}=== Test 8: Metric Data ===${NC}"
publish_json "metrics/system" '{"cpu": 45.2, "memory": 62.5, "disk": 38.7, "timestamp": "2024-01-15T10:30:20Z"}'
# Test 9: Node discovery event
echo -e "${YELLOW}=== Test 9: Node Discovery Event ===${NC}"
publish_json "cluster/node/discovered" '{"ip": "192.168.1.100", "hostname": "node-001", "status": "online", "version": "1.0.0"}'
# Test 10: Firmware update event
echo -e "${YELLOW}=== Test 10: Firmware Update Event ===${NC}"
publish_json "firmware/update/esp32-001" '{"node": "esp32-001", "status": "completed", "version": "v1.3.0", "size": 1234567}'
# Test 11: Task status
echo -e "${YELLOW}=== Test 11: Task Status ===${NC}"
publish_json "task/sync/status" '{"id": "sync-001", "status": "running", "progress": 75, "estimated_completion": "2024-01-15T10:35:00Z"}'
# Test 12: Error log
echo -e "${YELLOW}=== Test 12: Error Log ===${NC}"
publish_json "log/error" '{"severity": "error", "component": "mqtt-client", "message": "Connection timeout", "code": 1001}'
# Test 13: Light control (for SPORE nodes)
echo -e "${YELLOW}=== Test 13: Light Control ===${NC}"
publish_json "light/control" '{"id": "neopixel-001", "brightness": 128, "color": {"r": 255, "g": 0, "b": 0}, "pattern": "solid"}'
# Test 14: Binary data (as hex string)
echo -e "${YELLOW}=== Test 14: Binary Data ===${NC}"
publish_message "data/binary" "48656c6c6f20576f726c64" # "Hello World" in hex
# Test 15: Empty message
echo -e "${YELLOW}=== Test 15: Empty Message ===${NC}"
publish_message "test/empty" ""
# Test 16: Large payload
echo -e "${YELLOW}=== Test 16: Large Payload ===${NC}"
LARGE_PAYLOAD='{"data": "'$(head -c 1000 < /dev/zero | tr '\0' 'A')'"}'
publish_message "test/large" "${LARGE_PAYLOAD}"
echo "============================================"
echo -e "${GREEN}All tests completed successfully!${NC}"
echo "============================================"
echo ""
echo "To monitor these messages, connect to the WebSocket at:"
echo " ws://localhost:3001/ws"
echo ""
echo "You should see all these events forwarded with the format:"
echo ' {"topic": "...", "data": "...", "timestamp": "..."}'
echo ""

138
internal/mqtt/mqtt.go Normal file
View File

@@ -0,0 +1,138 @@
package mqtt
import (
"context"
"fmt"
"os"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
)
// MQTTClient represents an MQTT client for the gateway
type MQTTClient struct {
client mqtt.Client
serverURL string
username string
password string
connected bool
logger *log.Logger
messageCallback func(topic string, data []byte)
}
// NewMQTTClient creates a new MQTT client instance
func NewMQTTClient(serverURL, username, password string) *MQTTClient {
return &MQTTClient{
serverURL: serverURL,
username: username,
password: password,
logger: log.New(),
}
}
// SetMessageCallback sets the callback function to be called when messages are received
func (mc *MQTTClient) SetMessageCallback(callback func(topic string, data []byte)) {
mc.messageCallback = callback
}
// Connect connects to the MQTT broker
func (mc *MQTTClient) Connect() error {
opts := mqtt.NewClientOptions()
opts.AddBroker(mc.serverURL)
opts.SetClientID(fmt.Sprintf("spore-gateway-%d", time.Now().Unix()))
opts.SetCleanSession(true)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(10 * time.Second)
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
// Set credentials if provided
if mc.username != "" {
opts.SetUsername(mc.username)
}
if mc.password != "" {
opts.SetPassword(mc.password)
}
// Set connection callbacks
opts.SetOnConnectHandler(mc.onConnected)
opts.SetConnectionLostHandler(mc.onConnectionLost)
mc.client = mqtt.NewClient(opts)
mc.logger.WithFields(log.Fields{
"server": mc.serverURL,
"username": mc.username,
}).Info("Connecting to MQTT broker")
if token := mc.client.Connect(); token.Wait() && token.Error() != nil {
return fmt.Errorf("failed to connect to MQTT broker: %w", token.Error())
}
return nil
}
// onConnected is called when the client successfully connects to the broker
func (mc *MQTTClient) onConnected(client mqtt.Client) {
mc.logger.Info("Successfully connected to MQTT broker")
mc.connected = true
// Subscribe to all topics
if token := mc.client.Subscribe("#", 0, mc.handleMessage); token.Wait() && token.Error() != nil {
mc.logger.WithError(token.Error()).Error("Failed to subscribe to MQTT topics")
} else {
mc.logger.Info("Subscribed to all MQTT topics (#)")
}
}
// onConnectionLost is called when the connection to the broker is lost
func (mc *MQTTClient) onConnectionLost(client mqtt.Client, err error) {
mc.logger.WithError(err).Error("MQTT connection lost")
mc.connected = false
}
// handleMessage handles incoming MQTT messages
func (mc *MQTTClient) handleMessage(client mqtt.Client, msg mqtt.Message) {
topic := msg.Topic()
payload := msg.Payload()
mc.logger.WithFields(log.Fields{
"topic": topic,
"length": len(payload),
}).Debug("Received MQTT message")
// Call the callback if set
if mc.messageCallback != nil {
mc.messageCallback(topic, payload)
}
}
// Disconnect disconnects from the MQTT broker
func (mc *MQTTClient) Disconnect() {
if mc.client != nil && mc.connected {
mc.logger.Info("Disconnecting from MQTT broker")
mc.client.Disconnect(250)
mc.connected = false
}
}
// Shutdown gracefully shuts down the MQTT client
func (mc *MQTTClient) Shutdown(ctx context.Context) error {
mc.logger.Info("Shutting down MQTT client")
mc.Disconnect()
return nil
}
// IsConnected returns whether the client is currently connected
func (mc *MQTTClient) IsConnected() bool {
return mc.connected
}
// NewMQTTClientFromEnv creates a new MQTT client from environment variables
func NewMQTTClientFromEnv(serverURL string) *MQTTClient {
username := os.Getenv("MQTT_USER")
password := os.Getenv("MQTT_PASSWORD")
return NewMQTTClient(serverURL, username, password)
}

View File

@@ -177,6 +177,11 @@ func (hs *HTTPServer) Start() error {
return hs.server.ListenAndServe()
}
// BroadcastMQTTMessage broadcasts an MQTT message through the WebSocket server
func (hs *HTTPServer) BroadcastMQTTMessage(topic string, data []byte) {
hs.webSocketServer.BroadcastMQTTMessage(topic, data)
}
// Shutdown gracefully shuts down the HTTP server
func (hs *HTTPServer) Shutdown(ctx context.Context) error {
log.Info("Shutting down HTTP server")

View File

@@ -648,6 +648,53 @@ func (wss *WebSocketServer) BroadcastClusterEvent(topic string, data interface{}
}
}
// BroadcastMQTTMessage broadcasts an MQTT message to all connected WebSocket clients
func (wss *WebSocketServer) BroadcastMQTTMessage(topic string, data []byte) {
wss.mutex.RLock()
clients := make([]*websocket.Conn, 0, len(wss.clients))
for client := range wss.clients {
clients = append(clients, client)
}
wss.mutex.RUnlock()
if len(clients) == 0 {
return
}
message := struct {
Topic string `json:"topic"`
Data string `json:"data"`
Timestamp string `json:"timestamp"`
}{
Topic: topic,
Data: string(data),
Timestamp: time.Now().Format(time.RFC3339),
}
messageData, err := json.Marshal(message)
if err != nil {
wss.logger.WithError(err).Error("Failed to marshal MQTT message")
return
}
wss.logger.WithFields(log.Fields{
"topic": topic,
"clients": len(clients),
"length": len(data),
}).Debug("Broadcasting MQTT message to WebSocket clients")
// Send to all clients with write synchronization
wss.writeMutex.Lock()
defer wss.writeMutex.Unlock()
for _, client := range clients {
client.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := client.WriteMessage(websocket.TextMessage, messageData); err != nil {
wss.logger.WithError(err).Error("Failed to send MQTT message to client")
}
}
}
// Shutdown gracefully shuts down the WebSocket server
func (wss *WebSocketServer) Shutdown(ctx context.Context) error {
wss.logger.Info("Shutting down WebSocket server")

25
main.go
View File

@@ -10,6 +10,7 @@ import (
"time"
"spore-gateway/internal/discovery"
"spore-gateway/internal/mqtt"
"spore-gateway/internal/server"
"spore-gateway/pkg/config"
@@ -21,6 +22,7 @@ func main() {
configFile := flag.String("config", "", "Path to configuration file")
port := flag.String("port", "3001", "HTTP server port")
udpPort := flag.String("udp-port", "4210", "UDP discovery port")
mqttServer := flag.String("mqtt", "", "Enable MQTT integration with server URL (e.g., tcp://localhost:1883)")
logLevel := flag.String("log-level", "info", "Log level (debug, info, warn, error)")
flag.Parse()
@@ -61,6 +63,22 @@ func main() {
// Initialize HTTP server
httpServer := server.NewHTTPServer(cfg.HTTPPort, nodeDiscovery)
// Initialize MQTT client if enabled
var mqttClient *mqtt.MQTTClient
if *mqttServer != "" {
log.WithField("server", *mqttServer).Info("Initializing MQTT client")
mqttClient = mqtt.NewMQTTClientFromEnv(*mqttServer)
// Set callback to forward MQTT messages to WebSocket
mqttClient.SetMessageCallback(func(topic string, data []byte) {
httpServer.BroadcastMQTTMessage(topic, data)
})
if err := mqttClient.Connect(); err != nil {
log.WithError(err).Fatal("Failed to connect to MQTT broker")
}
}
// Setup graceful shutdown
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
@@ -90,6 +108,13 @@ func main() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Shutdown MQTT client
if mqttClient != nil {
if err := mqttClient.Shutdown(shutdownCtx); err != nil {
log.WithError(err).Error("MQTT client shutdown error")
}
}
// Shutdown HTTP server
if err := httpServer.Shutdown(shutdownCtx); err != nil {
log.WithError(err).Error("HTTP server shutdown error")