feat: MQTT integration
This commit is contained in:
138
internal/mqtt/mqtt.go
Normal file
138
internal/mqtt/mqtt.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// MQTTClient represents an MQTT client for the gateway
|
||||
type MQTTClient struct {
|
||||
client mqtt.Client
|
||||
serverURL string
|
||||
username string
|
||||
password string
|
||||
connected bool
|
||||
logger *log.Logger
|
||||
messageCallback func(topic string, data []byte)
|
||||
}
|
||||
|
||||
// NewMQTTClient creates a new MQTT client instance
|
||||
func NewMQTTClient(serverURL, username, password string) *MQTTClient {
|
||||
return &MQTTClient{
|
||||
serverURL: serverURL,
|
||||
username: username,
|
||||
password: password,
|
||||
logger: log.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// SetMessageCallback sets the callback function to be called when messages are received
|
||||
func (mc *MQTTClient) SetMessageCallback(callback func(topic string, data []byte)) {
|
||||
mc.messageCallback = callback
|
||||
}
|
||||
|
||||
// Connect connects to the MQTT broker
|
||||
func (mc *MQTTClient) Connect() error {
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.AddBroker(mc.serverURL)
|
||||
opts.SetClientID(fmt.Sprintf("spore-gateway-%d", time.Now().Unix()))
|
||||
opts.SetCleanSession(true)
|
||||
opts.SetAutoReconnect(true)
|
||||
opts.SetConnectRetry(true)
|
||||
opts.SetConnectRetryInterval(10 * time.Second)
|
||||
opts.SetKeepAlive(30 * time.Second)
|
||||
opts.SetPingTimeout(10 * time.Second)
|
||||
|
||||
// Set credentials if provided
|
||||
if mc.username != "" {
|
||||
opts.SetUsername(mc.username)
|
||||
}
|
||||
if mc.password != "" {
|
||||
opts.SetPassword(mc.password)
|
||||
}
|
||||
|
||||
// Set connection callbacks
|
||||
opts.SetOnConnectHandler(mc.onConnected)
|
||||
opts.SetConnectionLostHandler(mc.onConnectionLost)
|
||||
|
||||
mc.client = mqtt.NewClient(opts)
|
||||
|
||||
mc.logger.WithFields(log.Fields{
|
||||
"server": mc.serverURL,
|
||||
"username": mc.username,
|
||||
}).Info("Connecting to MQTT broker")
|
||||
|
||||
if token := mc.client.Connect(); token.Wait() && token.Error() != nil {
|
||||
return fmt.Errorf("failed to connect to MQTT broker: %w", token.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// onConnected is called when the client successfully connects to the broker
|
||||
func (mc *MQTTClient) onConnected(client mqtt.Client) {
|
||||
mc.logger.Info("Successfully connected to MQTT broker")
|
||||
mc.connected = true
|
||||
|
||||
// Subscribe to all topics
|
||||
if token := mc.client.Subscribe("#", 0, mc.handleMessage); token.Wait() && token.Error() != nil {
|
||||
mc.logger.WithError(token.Error()).Error("Failed to subscribe to MQTT topics")
|
||||
} else {
|
||||
mc.logger.Info("Subscribed to all MQTT topics (#)")
|
||||
}
|
||||
}
|
||||
|
||||
// onConnectionLost is called when the connection to the broker is lost
|
||||
func (mc *MQTTClient) onConnectionLost(client mqtt.Client, err error) {
|
||||
mc.logger.WithError(err).Error("MQTT connection lost")
|
||||
mc.connected = false
|
||||
}
|
||||
|
||||
// handleMessage handles incoming MQTT messages
|
||||
func (mc *MQTTClient) handleMessage(client mqtt.Client, msg mqtt.Message) {
|
||||
topic := msg.Topic()
|
||||
payload := msg.Payload()
|
||||
|
||||
mc.logger.WithFields(log.Fields{
|
||||
"topic": topic,
|
||||
"length": len(payload),
|
||||
}).Debug("Received MQTT message")
|
||||
|
||||
// Call the callback if set
|
||||
if mc.messageCallback != nil {
|
||||
mc.messageCallback(topic, payload)
|
||||
}
|
||||
}
|
||||
|
||||
// Disconnect disconnects from the MQTT broker
|
||||
func (mc *MQTTClient) Disconnect() {
|
||||
if mc.client != nil && mc.connected {
|
||||
mc.logger.Info("Disconnecting from MQTT broker")
|
||||
mc.client.Disconnect(250)
|
||||
mc.connected = false
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the MQTT client
|
||||
func (mc *MQTTClient) Shutdown(ctx context.Context) error {
|
||||
mc.logger.Info("Shutting down MQTT client")
|
||||
mc.Disconnect()
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsConnected returns whether the client is currently connected
|
||||
func (mc *MQTTClient) IsConnected() bool {
|
||||
return mc.connected
|
||||
}
|
||||
|
||||
// NewMQTTClientFromEnv creates a new MQTT client from environment variables
|
||||
func NewMQTTClientFromEnv(serverURL string) *MQTTClient {
|
||||
username := os.Getenv("MQTT_USER")
|
||||
password := os.Getenv("MQTT_PASSWORD")
|
||||
return NewMQTTClient(serverURL, username, password)
|
||||
}
|
||||
@@ -177,6 +177,11 @@ func (hs *HTTPServer) Start() error {
|
||||
return hs.server.ListenAndServe()
|
||||
}
|
||||
|
||||
// BroadcastMQTTMessage broadcasts an MQTT message through the WebSocket server
|
||||
func (hs *HTTPServer) BroadcastMQTTMessage(topic string, data []byte) {
|
||||
hs.webSocketServer.BroadcastMQTTMessage(topic, data)
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the HTTP server
|
||||
func (hs *HTTPServer) Shutdown(ctx context.Context) error {
|
||||
log.Info("Shutting down HTTP server")
|
||||
|
||||
@@ -648,6 +648,53 @@ func (wss *WebSocketServer) BroadcastClusterEvent(topic string, data interface{}
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastMQTTMessage broadcasts an MQTT message to all connected WebSocket clients
|
||||
func (wss *WebSocketServer) BroadcastMQTTMessage(topic string, data []byte) {
|
||||
wss.mutex.RLock()
|
||||
clients := make([]*websocket.Conn, 0, len(wss.clients))
|
||||
for client := range wss.clients {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
wss.mutex.RUnlock()
|
||||
|
||||
if len(clients) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
message := struct {
|
||||
Topic string `json:"topic"`
|
||||
Data string `json:"data"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}{
|
||||
Topic: topic,
|
||||
Data: string(data),
|
||||
Timestamp: time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
messageData, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
wss.logger.WithError(err).Error("Failed to marshal MQTT message")
|
||||
return
|
||||
}
|
||||
|
||||
wss.logger.WithFields(log.Fields{
|
||||
"topic": topic,
|
||||
"clients": len(clients),
|
||||
"length": len(data),
|
||||
}).Debug("Broadcasting MQTT message to WebSocket clients")
|
||||
|
||||
// Send to all clients with write synchronization
|
||||
wss.writeMutex.Lock()
|
||||
defer wss.writeMutex.Unlock()
|
||||
|
||||
for _, client := range clients {
|
||||
client.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||
if err := client.WriteMessage(websocket.TextMessage, messageData); err != nil {
|
||||
wss.logger.WithError(err).Error("Failed to send MQTT message to client")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the WebSocket server
|
||||
func (wss *WebSocketServer) Shutdown(ctx context.Context) error {
|
||||
wss.logger.Info("Shutting down WebSocket server")
|
||||
|
||||
Reference in New Issue
Block a user