feat: use gateway

This commit is contained in:
2025-10-27 09:37:20 +01:00
parent 7b2f600f8c
commit 858be416eb
8 changed files with 279 additions and 255 deletions

160
server/gateway-client.js Normal file
View File

@@ -0,0 +1,160 @@
// Gateway Client - Communicates with spore-gateway for node discovery
const http = require('http');
class GatewayClient {
constructor(options = {}) {
this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001';
this.pollInterval = options.pollInterval || 2000; // Poll every 2 seconds
this.filterAppLabel = options.filterAppLabel || 'pixelstream'; // Filter nodes by app label, set to null to disable
this.nodes = new Map(); // ip -> { lastSeen, status, hostname, port }
this.isRunning = false;
this.pollTimer = null;
}
start() {
if (this.isRunning) {
return;
}
this.isRunning = true;
console.log(`Starting Gateway client, connecting to ${this.gatewayUrl}`);
// Initial fetch
this.fetchNodes();
// Start polling
this.pollTimer = setInterval(() => {
this.fetchNodes();
}, this.pollInterval);
}
stop() {
if (!this.isRunning) {
return;
}
this.isRunning = false;
if (this.pollTimer) {
clearInterval(this.pollTimer);
this.pollTimer = null;
}
this.nodes.clear();
console.log('Gateway client stopped');
}
async fetchNodes() {
try {
const response = await this.httpGet(`${this.gatewayUrl}/api/discovery/nodes`);
const data = JSON.parse(response);
// Update nodes from gateway response
const newNodes = new Map();
let totalNodes = 0;
let filteredNodes = 0;
if (data.nodes && Array.isArray(data.nodes)) {
totalNodes = data.nodes.length;
data.nodes.forEach(node => {
// Filter for nodes with specified app label (if filtering is enabled)
if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) {
filteredNodes++;
return;
}
const nodeIp = node.ip;
newNodes.set(nodeIp, {
lastSeen: Date.now(),
status: node.status || 'active',
hostname: node.hostname || nodeIp,
port: node.port || 4210,
isPrimary: node.isPrimary || false
});
});
//if (totalNodes > 0 && filteredNodes > 0 && this.filterAppLabel) {
// console.loh(`Filtered ${filteredNodes} nodes without app: ${this.filterAppLabel} label (${newNodes.size} ${this.filterAppLabel} nodes active)`);
//}
}
// Check for newly discovered nodes
for (const [ip, nodeInfo] of newNodes.entries()) {
const existingNode = this.nodes.get(ip);
if (!existingNode) {
console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`);
this.nodes.set(ip, nodeInfo);
// Could emit an event here if needed: this.emit('nodeDiscovered', nodeInfo);
} else if (existingNode.hostname !== nodeInfo.hostname) {
console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`);
this.nodes.set(ip, nodeInfo);
}
}
// Check for lost nodes
for (const ip of this.nodes.keys()) {
if (!newNodes.has(ip)) {
console.log(`Node lost via gateway: ${ip}`);
this.nodes.delete(ip);
// Could emit an event here if needed: this.emit('nodeLost', { ip });
}
}
} catch (error) {
console.error('Error fetching nodes from gateway:', error.message);
}
}
httpGet(url) {
return new Promise((resolve, reject) => {
http.get(url, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
if (res.statusCode === 200) {
resolve(data);
} else {
reject(new Error(`HTTP ${res.statusCode}: ${data}`));
}
});
res.on('error', (err) => {
reject(err);
});
}).on('error', (err) => {
reject(err);
});
});
}
getNodes() {
return Array.from(this.nodes.entries()).map(([ip, node]) => ({
ip,
hostname: node.hostname || ip,
port: node.port,
status: node.status,
...node
}));
}
getNodeCount() {
return this.nodes.size;
}
hasAppLabel(node, appLabel) {
// Check if node has the app: <appLabel> label
if (!node.labels || typeof node.labels !== 'object') {
return false;
}
return node.labels.app === appLabel;
}
}
module.exports = GatewayClient;

View File

@@ -6,13 +6,16 @@ const WebSocket = require('ws');
const path = require('path');
// Import services
const UdpDiscovery = require('./udp-discovery');
const UdpSender = require('./udp-discovery');
const GatewayClient = require('./gateway-client');
const PresetRegistry = require('../presets/preset-registry');
class LEDLabServer {
constructor(options = {}) {
this.port = options.port || 8080;
this.udpPort = options.udpPort || 4210;
this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001';
this.filterAppLabel = options.filterAppLabel || 'pixelstream';
this.matrixWidth = options.matrixWidth || 16;
this.matrixHeight = options.matrixHeight || 16;
this.fps = options.fps || 20;
@@ -21,7 +24,11 @@ class LEDLabServer {
this.server = http.createServer(this.app);
this.wss = new WebSocket.Server({ server: this.server });
this.udpDiscovery = new UdpDiscovery(this.udpPort);
this.udpSender = new UdpSender(this.udpPort);
this.gatewayClient = new GatewayClient({
gatewayUrl: this.gatewayUrl,
filterAppLabel: this.filterAppLabel || 'pixelstream'
});
this.presetRegistry = new PresetRegistry();
// Legacy single-stream support (kept for backwards compatibility)
@@ -37,7 +44,7 @@ class LEDLabServer {
this.setupExpress();
this.setupWebSocket();
this.setupUdpDiscovery();
this.setupGatewayClient();
this.setupPresetManager();
}
@@ -47,7 +54,7 @@ class LEDLabServer {
// API routes
this.app.get('/api/nodes', (req, res) => {
const nodes = this.udpDiscovery.getNodes();
const nodes = this.gatewayClient.getNodes();
res.json({ nodes });
});
@@ -61,7 +68,7 @@ class LEDLabServer {
streaming: this.currentPreset !== null,
currentPreset: this.currentPresetName || null,
matrixSize: { width: this.matrixWidth, height: this.matrixHeight },
nodeCount: this.udpDiscovery.getNodeCount(),
nodeCount: this.gatewayClient.getNodeCount(),
currentTarget: this.currentTarget,
fps: this.fps,
});
@@ -122,7 +129,7 @@ class LEDLabServer {
streaming: this.currentPreset !== null,
currentPreset: this.currentPresetName || null,
matrixSize: { width: this.matrixWidth, height: this.matrixHeight },
nodes: this.udpDiscovery.getNodes(),
nodes: this.gatewayClient.getNodes(),
presetParameters: this.currentPreset ? this.currentPreset.getParameters() : null,
currentTarget: this.currentTarget,
fps: this.fps,
@@ -174,26 +181,14 @@ class LEDLabServer {
}
}
setupUdpDiscovery() {
this.udpDiscovery.on('nodeDiscovered', (node) => {
console.log('Node discovered:', node.ip);
this.broadcastToClients({
type: 'nodeDiscovered',
node
});
});
this.udpDiscovery.on('nodeLost', (node) => {
console.log('Node lost:', node.ip);
this.broadcastToClients({
type: 'nodeLost',
node
});
});
this.udpDiscovery.start();
setupGatewayClient() {
// Start gateway client for node discovery
this.gatewayClient.start();
// Start UDP sender for sending frames
this.udpSender.start();
console.log('Using gateway for node discovery and UDP sender for frame streaming');
}
setupPresetManager() {
@@ -441,7 +436,9 @@ class LEDLabServer {
if (frameData) {
// Send to specific target
if (this.currentTarget) {
this.udpDiscovery.sendToNode(this.currentTarget, frameData);
this.udpSender.sendToNode(this.currentTarget, frameData).catch(err => {
console.error(`Error sending frame to ${this.currentTarget}:`, err);
});
}
// Send frame data to WebSocket clients for preview
@@ -463,7 +460,9 @@ class LEDLabServer {
if (frameData) {
// Send to specific node
// frameData format: "RAW:FF0000FF0000..." (RAW prefix + hex pixel data)
this.udpDiscovery.sendToNode(nodeIp, frameData);
this.udpSender.sendToNode(nodeIp, frameData).catch(err => {
console.error(`Error sending frame to ${nodeIp}:`, err);
});
// Send frame data to WebSocket clients for preview
this.broadcastToClients({
@@ -476,7 +475,7 @@ class LEDLabServer {
}
sendToSpecificNode(nodeIp, message) {
return this.udpDiscovery.sendToNode(nodeIp, message);
return this.udpSender.sendToNode(nodeIp, message);
}
broadcastCurrentState() {
@@ -484,7 +483,7 @@ class LEDLabServer {
streaming: this.currentPreset !== null,
currentPreset: this.currentPresetName || null,
matrixSize: { width: this.matrixWidth, height: this.matrixHeight },
nodes: this.udpDiscovery.getNodes(),
nodes: this.gatewayClient.getNodes(),
presetParameters: this.currentPreset ? this.currentPreset.getParameters() : null,
currentTarget: this.currentTarget,
fps: this.fps,
@@ -604,7 +603,8 @@ class LEDLabServer {
startServer() {
this.server.listen(this.port, () => {
console.log(`LEDLab server running on port ${this.port}`);
console.log(`UDP discovery on port ${this.udpPort}`);
console.log(`Gateway client connecting to ${this.gatewayUrl}`);
console.log(`UDP sender configured for port ${this.udpPort}`);
console.log(`Matrix size: ${this.matrixWidth}x${this.matrixHeight}`);
});
}
@@ -615,8 +615,9 @@ class LEDLabServer {
// Stop streaming first
this.stopStreaming();
// Stop UDP discovery
this.udpDiscovery.stop();
// Stop gateway client and UDP sender
this.gatewayClient.stop();
this.udpSender.stop();
// Close all WebSocket connections immediately
this.wss.close();
@@ -646,6 +647,8 @@ if (require.main === module) {
const server = new LEDLabServer({
port: process.env.PORT || 8080,
udpPort: process.env.UDP_PORT || 4210,
gatewayUrl: process.env.GATEWAY_URL || 'http://localhost:3001',
filterAppLabel: process.env.FILTER_APP_LABEL || 'pixelstream',
matrixWidth: parseInt(process.env.MATRIX_WIDTH) || 16,
matrixHeight: parseInt(process.env.MATRIX_HEIGHT) || 16,
});

View File

@@ -1,35 +1,12 @@
// UDP Discovery service for SPORE nodes
// UDP Sender service for sending frames to SPORE nodes
const dgram = require('dgram');
const EventEmitter = require('events');
const os = require('os');
class UdpDiscovery extends EventEmitter {
class UdpSender {
constructor(port = 4210) {
super();
this.port = port;
this.socket = null;
this.nodes = new Map(); // ip -> { lastSeen, status }
this.discoveryInterval = null;
this.isRunning = false;
// Get local network interfaces to filter out local server
this.localInterfaces = this.getLocalInterfaces();
}
getLocalInterfaces() {
const interfaces = os.networkInterfaces();
const localIPs = new Set();
Object.values(interfaces).forEach(iface => {
iface.forEach(addr => {
if (addr.family === 'IPv4' && !addr.internal) {
localIPs.add(addr.address);
}
});
});
return localIPs;
}
start() {
@@ -40,24 +17,15 @@ class UdpDiscovery extends EventEmitter {
this.socket = dgram.createSocket('udp4');
this.isRunning = true;
this.socket.on('message', (msg, rinfo) => {
this.handleMessage(msg, rinfo);
});
this.socket.on('error', (err) => {
console.error('UDP Discovery socket error:', err);
this.emit('error', err);
console.error('UDP Sender socket error:', err);
});
this.socket.bind(this.port, () => {
console.log(`UDP Discovery listening on port ${this.port}`);
// Enable broadcast after binding
this.socket.bind(0, () => {
// Bind to any available port
this.socket.setBroadcast(true);
this.emit('started');
console.log(`UDP Sender ready on port ${this.socket.address().port}`);
});
// Start periodic discovery broadcast
this.startDiscoveryBroadcast();
}
stop() {
@@ -67,162 +35,12 @@ class UdpDiscovery extends EventEmitter {
this.isRunning = false;
if (this.discoveryInterval) {
clearInterval(this.discoveryInterval);
this.discoveryInterval = null;
}
if (this.socket) {
this.socket.close();
this.socket = null;
}
this.nodes.clear();
console.log('UDP Discovery stopped');
this.emit('stopped');
}
handleMessage(msg, rinfo) {
const message = msg.toString('utf8');
const nodeIp = rinfo.address;
// Skip local server IPs
if (this.localInterfaces.has(nodeIp)) {
return;
}
// Handle different message types
if (message.startsWith('cluster/heartbeat:')) {
// Extract hostname from heartbeat: "cluster/heartbeat:hostname"
const hostname = message.substring('cluster/heartbeat:'.length);
this.handleHeartbeat(hostname, nodeIp, rinfo.port);
} else if (message.startsWith('node/update:')) {
// Extract hostname and JSON from update: "node/update:hostname:{json}"
const parts = message.substring('node/update:'.length).split(':');
if (parts.length >= 2) {
const hostname = parts[0];
const jsonStr = parts.slice(1).join(':'); // Rejoin in case JSON contains colons
this.handleNodeUpdate(hostname, jsonStr, nodeIp, rinfo.port);
}
}
}
handleHeartbeat(hostname, nodeIp, port) {
console.log(`Heartbeat from ${hostname} @ ${nodeIp}`);
// Update or add node
const existingNode = this.nodes.get(nodeIp);
this.nodes.set(nodeIp, {
lastSeen: Date.now(),
status: 'connected',
address: nodeIp,
port: port,
hostname: hostname
});
// Only emit if this is a new node or if we need to update
if (!existingNode || existingNode.hostname !== hostname) {
this.emit('nodeDiscovered', {
ip: nodeIp,
hostname: hostname,
port: port,
status: 'connected'
});
}
// Clean up stale nodes periodically
this.cleanupStaleNodes();
}
handleNodeUpdate(hostname, jsonStr, nodeIp, port) {
console.log(`Node update from ${hostname} @ ${nodeIp}`);
// Try to parse JSON to extract additional info
let nodeInfo = {};
try {
nodeInfo = JSON.parse(jsonStr);
} catch (e) {
console.warn(`Failed to parse node update JSON: ${e.message}`);
}
// Update node with hostname and any additional info
const existingNode = this.nodes.get(nodeIp);
this.nodes.set(nodeIp, {
lastSeen: Date.now(),
status: 'connected',
address: nodeIp,
port: port,
hostname: hostname || nodeInfo.hostname || existingNode?.hostname,
...nodeInfo
});
// Emit update event
this.emit('nodeDiscovered', {
ip: nodeIp,
hostname: hostname || nodeInfo.hostname || existingNode?.hostname,
port: port,
status: 'connected'
});
}
startDiscoveryBroadcast() {
// With the new protocol, SPORE nodes automatically broadcast heartbeats
// LEDLab passively listens for these heartbeats, so we don't need to broadcast.
// However, we can optionally send a heartbeat to prompt nodes to respond faster.
// For now, we just listen for incoming heartbeats from nodes.
// Optional: send initial heartbeat to prompt nodes to announce themselves
this.broadcastHeartbeat();
// Send periodic heartbeats to prompt node announcements (every 10 seconds)
this.discoveryInterval = setInterval(() => {
this.broadcastHeartbeat();
}, 10000);
}
broadcastHeartbeat() {
if (!this.socket) {
return;
}
// Send heartbeat using the new protocol format: "cluster/heartbeat:hostname"
const hostname = 'ledlab-client';
const discoveryMessage = `cluster/heartbeat:${hostname}`;
const message = Buffer.from(discoveryMessage, 'utf8');
this.socket.send(message, 0, message.length, this.port, '255.255.255.255', (err) => {
if (err) {
console.error('Error broadcasting heartbeat:', err);
} else {
console.log('Discovery heartbeat broadcasted');
}
});
}
cleanupStaleNodes() {
const now = Date.now();
const staleThreshold = 10000; // 10 seconds
for (const [ip, node] of this.nodes.entries()) {
if (now - node.lastSeen > staleThreshold) {
this.nodes.delete(ip);
this.emit('nodeLost', { ip, status: 'disconnected' });
}
}
}
getNodes() {
const nodes = Array.from(this.nodes.entries()).map(([ip, node]) => ({
ip,
hostname: node.hostname || ip,
...node
}));
return nodes;
}
getNodeCount() {
return this.nodes.size;
console.log('UDP Sender stopped');
}
sendToNode(nodeIp, message) {
@@ -231,15 +49,17 @@ class UdpDiscovery extends EventEmitter {
}
const buffer = Buffer.from(message, 'utf8');
this.socket.send(buffer, 0, buffer.length, this.port, nodeIp, (err) => {
if (err) {
console.error(`Error sending to node ${nodeIp}:`, err);
return false;
}
return true;
return new Promise((resolve, reject) => {
this.socket.send(buffer, 0, buffer.length, this.port, nodeIp, (err) => {
if (err) {
console.error(`Error sending to node ${nodeIp}:`, err);
reject(err);
return;
}
resolve(true);
});
});
return true;
}
broadcastToAll(message) {
@@ -250,16 +70,17 @@ class UdpDiscovery extends EventEmitter {
const buffer = Buffer.from(message, 'utf8');
this.socket.setBroadcast(true);
this.socket.send(buffer, 0, buffer.length, this.port, '255.255.255.255', (err) => {
if (err) {
console.error('Error broadcasting message:', err);
return false;
}
return true;
return new Promise((resolve, reject) => {
this.socket.send(buffer, 0, buffer.length, this.port, '255.255.255.255', (err) => {
if (err) {
console.error('Error broadcasting message:', err);
reject(err);
return;
}
resolve(true);
});
});
return true;
}
}
module.exports = UdpDiscovery;
module.exports = UdpSender;