feat: update to new cluster protocol
This commit is contained in:
@@ -70,14 +70,34 @@ udpServer.on('message', (msg, rinfo) => {
|
|||||||
|
|
||||||
console.log(`📨 UDP message received from ${sourceIp}:${sourcePort}: "${message}"`);
|
console.log(`📨 UDP message received from ${sourceIp}:${sourcePort}: "${message}"`);
|
||||||
|
|
||||||
if (message.startsWith('CLUSTER_HEARTBEAT:')) {
|
// Extract topic by splitting on first ":"
|
||||||
// Handle heartbeat messages that update member list
|
const parts = message.split(':', 2);
|
||||||
const hostname = message.substring('CLUSTER_HEARTBEAT:'.length);
|
if (parts.length < 2) {
|
||||||
updateNodeFromHeartbeat(sourceIp, sourcePort, hostname);
|
console.log(`Invalid message format from ${sourceIp}:${sourcePort}: "${message}"`);
|
||||||
} else if (message.startsWith('NODE_UPDATE:')) {
|
return;
|
||||||
// Handle node update messages that provide detailed node info
|
}
|
||||||
handleNodeUpdate(sourceIp, message);
|
|
||||||
} else if (!message.startsWith('RAW:')) {
|
const topic = parts[0] + ':';
|
||||||
|
const payload = parts[1];
|
||||||
|
|
||||||
|
// Handler map for different UDP message types
|
||||||
|
const handlers = {
|
||||||
|
'cluster/heartbeat:': (payload, sourceIp, sourcePort) => {
|
||||||
|
updateNodeFromHeartbeat(sourceIp, sourcePort, payload);
|
||||||
|
},
|
||||||
|
'node/update:': (payload, sourceIp) => {
|
||||||
|
handleNodeUpdate(sourceIp, 'node/update:' + payload);
|
||||||
|
},
|
||||||
|
'raw:': () => {
|
||||||
|
// Ignore raw messages
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Look up and execute handler
|
||||||
|
const handler = handlers[topic];
|
||||||
|
if (handler) {
|
||||||
|
handler(payload, sourceIp, sourcePort);
|
||||||
|
} else {
|
||||||
console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`);
|
console.log(`Received unknown message from ${sourceIp}:${sourcePort}: "${message}"`);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -919,7 +939,7 @@ function broadcastClusterUpdate() {
|
|||||||
// Get cluster members asynchronously
|
// Get cluster members asynchronously
|
||||||
getCurrentClusterMembers().then(members => {
|
getCurrentClusterMembers().then(members => {
|
||||||
const clusterData = {
|
const clusterData = {
|
||||||
type: 'cluster_update',
|
topic: 'cluster/update',
|
||||||
members: members,
|
members: members,
|
||||||
primaryNode: primaryNodeIp,
|
primaryNode: primaryNodeIp,
|
||||||
totalNodes: discoveredNodes.size,
|
totalNodes: discoveredNodes.size,
|
||||||
@@ -946,7 +966,7 @@ function broadcastNodeDiscovery(nodeIp, action) {
|
|||||||
if (wsClients.size === 0 || !wss) return;
|
if (wsClients.size === 0 || !wss) return;
|
||||||
|
|
||||||
const eventData = {
|
const eventData = {
|
||||||
type: 'node_discovery',
|
topic: 'node/discovery',
|
||||||
action: action, // 'discovered' or 'stale'
|
action: action, // 'discovered' or 'stale'
|
||||||
nodeIp: nodeIp,
|
nodeIp: nodeIp,
|
||||||
timestamp: new Date().toISOString()
|
timestamp: new Date().toISOString()
|
||||||
@@ -1122,7 +1142,7 @@ const server = app.listen(PORT, '0.0.0.0', () => {
|
|||||||
initializeWebSocketServer(server);
|
initializeWebSocketServer(server);
|
||||||
console.log('WebSocket server ready for real-time updates');
|
console.log('WebSocket server ready for real-time updates');
|
||||||
|
|
||||||
console.log('Waiting for CLUSTER_HEARTBEAT and NODE_UPDATE messages from SPORE nodes...');
|
console.log('Waiting for cluster/heartbeat and node/update messages from SPORE nodes...');
|
||||||
});
|
});
|
||||||
|
|
||||||
// Graceful shutdown handling
|
// Graceful shutdown handling
|
||||||
|
|||||||
@@ -263,7 +263,8 @@ class WebSocketClient {
|
|||||||
try {
|
try {
|
||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
logger.debug('WebSocket message received:', data);
|
logger.debug('WebSocket message received:', data);
|
||||||
logger.debug('WebSocket message type:', data.type);
|
const messageTopic = data.topic || data.type;
|
||||||
|
logger.debug('WebSocket message topic:', messageTopic);
|
||||||
this.emit('message', data);
|
this.emit('message', data);
|
||||||
this.handleMessage(data);
|
this.handleMessage(data);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -288,21 +289,21 @@ class WebSocketClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
handleMessage(data) {
|
handleMessage(data) {
|
||||||
switch (data.type) {
|
const messageTopic = data.topic || data.type;
|
||||||
case 'cluster_update':
|
|
||||||
this.emit('clusterUpdate', data);
|
// Handler map for different WebSocket message types
|
||||||
break;
|
const handlers = {
|
||||||
case 'node_discovery':
|
'cluster/update': (data) => this.emit('clusterUpdate', data),
|
||||||
this.emit('nodeDiscovery', data);
|
'node/discovery': (data) => this.emit('nodeDiscovery', data),
|
||||||
break;
|
'firmware/upload/status': (data) => this.emit('firmwareUploadStatus', data),
|
||||||
case 'firmware_upload_status':
|
'rollout/progress': (data) => this.emit('rolloutProgress', data)
|
||||||
this.emit('firmwareUploadStatus', data);
|
};
|
||||||
break;
|
|
||||||
case 'rollout_progress':
|
const handler = handlers[messageTopic];
|
||||||
this.emit('rolloutProgress', data);
|
if (handler) {
|
||||||
break;
|
handler(data);
|
||||||
default:
|
} else {
|
||||||
logger.debug('Unknown WebSocket message type:', data.type);
|
logger.debug('Unknown WebSocket message topic:', messageTopic);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user