203 lines
5.6 KiB
JavaScript
203 lines
5.6 KiB
JavaScript
// Gateway Client - Communicates with spore-gateway for node discovery
|
|
|
|
const WebSocket = require('ws');
|
|
|
|
class GatewayClient {
|
|
constructor(options = {}) {
|
|
this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001';
|
|
this.filterAppLabel = options.filterAppLabel || 'pixelstream'; // Filter nodes by app label, set to null to disable
|
|
this.nodes = new Map(); // ip -> { lastSeen, status, hostname, port }
|
|
this.isRunning = false;
|
|
this.ws = null;
|
|
this.reconnectAttempts = 0;
|
|
this.maxReconnectAttempts = 10;
|
|
this.reconnectDelay = 2000;
|
|
this.reconnectTimer = null;
|
|
}
|
|
|
|
start() {
|
|
if (this.isRunning) {
|
|
return;
|
|
}
|
|
|
|
this.isRunning = true;
|
|
console.log(`Starting Gateway client, connecting to ${this.gatewayUrl}`);
|
|
|
|
this.connectWebSocket();
|
|
}
|
|
|
|
stop() {
|
|
if (!this.isRunning) {
|
|
return;
|
|
}
|
|
|
|
this.isRunning = false;
|
|
|
|
if (this.reconnectTimer) {
|
|
clearTimeout(this.reconnectTimer);
|
|
this.reconnectTimer = null;
|
|
}
|
|
|
|
if (this.ws) {
|
|
this.ws.close();
|
|
this.ws = null;
|
|
}
|
|
|
|
this.nodes.clear();
|
|
console.log('Gateway client stopped');
|
|
}
|
|
|
|
connectWebSocket() {
|
|
try {
|
|
// Convert http:// to ws:// for WebSocket
|
|
const wsUrl = this.gatewayUrl.replace('http://', 'ws://').replace('https://', 'wss://') + '/ws';
|
|
console.log(`Connecting to WebSocket: ${wsUrl}`);
|
|
|
|
this.ws = new WebSocket(wsUrl);
|
|
|
|
this.ws.on('open', () => {
|
|
console.log('WebSocket connected to gateway');
|
|
this.reconnectAttempts = 0;
|
|
});
|
|
|
|
this.ws.on('message', (data) => {
|
|
try {
|
|
const message = JSON.parse(data.toString());
|
|
this.handleWebSocketMessage(message);
|
|
} catch (error) {
|
|
console.error('Error parsing WebSocket message:', error);
|
|
}
|
|
});
|
|
|
|
this.ws.on('close', (code, reason) => {
|
|
console.log(`WebSocket connection closed: ${code} ${reason}`);
|
|
if (this.isRunning) {
|
|
this.scheduleReconnect();
|
|
}
|
|
});
|
|
|
|
this.ws.on('error', (error) => {
|
|
console.error('WebSocket error:', error.message);
|
|
});
|
|
|
|
} catch (error) {
|
|
console.error('Failed to create WebSocket connection:', error);
|
|
this.scheduleReconnect();
|
|
}
|
|
}
|
|
|
|
handleWebSocketMessage(message) {
|
|
if (message.topic === 'cluster/update') {
|
|
this.handleClusterUpdate(message.members, message.primaryNode, message.totalNodes);
|
|
} else if (message.topic === 'node/discovery') {
|
|
this.handleNodeDiscovery(message.action, message.nodeIp);
|
|
}
|
|
}
|
|
|
|
handleClusterUpdate(members, primaryNode, totalNodes) {
|
|
const newNodes = new Map();
|
|
|
|
if (members && Array.isArray(members)) {
|
|
members.forEach(node => {
|
|
// Filter for nodes with specified app label (if filtering is enabled)
|
|
if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) {
|
|
return;
|
|
}
|
|
|
|
const nodeIp = node.ip || node.IP;
|
|
newNodes.set(nodeIp, {
|
|
lastSeen: Date.now(),
|
|
status: node.status || node.Status || 'active',
|
|
hostname: node.hostname || node.Hostname || nodeIp,
|
|
port: node.port || node.Port || 4210,
|
|
isPrimary: (primaryNode === nodeIp),
|
|
labels: node.labels || node.Labels
|
|
});
|
|
});
|
|
}
|
|
|
|
// Check for newly discovered nodes
|
|
for (const [ip, nodeInfo] of newNodes.entries()) {
|
|
const existingNode = this.nodes.get(ip);
|
|
if (!existingNode) {
|
|
console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`);
|
|
this.nodes.set(ip, nodeInfo);
|
|
} else if (existingNode.hostname !== nodeInfo.hostname) {
|
|
console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`);
|
|
this.nodes.set(ip, nodeInfo);
|
|
} else {
|
|
// Update status and last seen
|
|
existingNode.status = nodeInfo.status;
|
|
existingNode.lastSeen = nodeInfo.lastSeen;
|
|
existingNode.isPrimary = nodeInfo.isPrimary;
|
|
}
|
|
}
|
|
|
|
// Check for lost nodes
|
|
for (const ip of this.nodes.keys()) {
|
|
if (!newNodes.has(ip)) {
|
|
console.log(`Node lost via gateway: ${ip}`);
|
|
this.nodes.delete(ip);
|
|
}
|
|
}
|
|
}
|
|
|
|
handleNodeDiscovery(action, nodeIp) {
|
|
if (action === 'discovered') {
|
|
// Node was discovered - actual data will come via cluster/update
|
|
console.log(`Node discovered event: ${nodeIp}`);
|
|
} else if (action === 'removed') {
|
|
console.log(`Node removed event: ${nodeIp}`);
|
|
this.nodes.delete(nodeIp);
|
|
}
|
|
}
|
|
|
|
scheduleReconnect() {
|
|
if (!this.isRunning) {
|
|
return;
|
|
}
|
|
|
|
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
|
console.error('Max WebSocket reconnection attempts reached');
|
|
return;
|
|
}
|
|
|
|
this.reconnectAttempts++;
|
|
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
|
|
|
|
console.log(`Attempting to reconnect WebSocket in ${delay}ms (attempt ${this.reconnectAttempts})`);
|
|
|
|
this.reconnectTimer = setTimeout(() => {
|
|
this.connectWebSocket();
|
|
}, delay);
|
|
}
|
|
|
|
getNodes() {
|
|
return Array.from(this.nodes.entries()).map(([ip, node]) => ({
|
|
ip,
|
|
hostname: node.hostname || ip,
|
|
port: node.port,
|
|
status: node.status,
|
|
isPrimary: node.isPrimary,
|
|
...node
|
|
}));
|
|
}
|
|
|
|
getNodeCount() {
|
|
return this.nodes.size;
|
|
}
|
|
|
|
hasAppLabel(node, appLabel) {
|
|
// Check if node has the app: <appLabel> label
|
|
if (!node.labels || typeof node.labels !== 'object') {
|
|
return false;
|
|
}
|
|
|
|
return node.labels.app === appLabel;
|
|
}
|
|
}
|
|
|
|
module.exports = GatewayClient;
|
|
|
|
|