fix: primary node failover
This commit is contained in:
71
index.js
71
index.js
@@ -195,6 +195,49 @@ function updateSporeClient() {
|
||||
}
|
||||
}
|
||||
|
||||
// Helper: perform an operation against the current primary, failing over to other discovered nodes if needed
|
||||
async function performWithFailover(operation) {
|
||||
// Build candidate list: current primary first, then others by most recently seen
|
||||
const candidateIps = [];
|
||||
if (primaryNodeIp && discoveredNodes.has(primaryNodeIp)) {
|
||||
candidateIps.push(primaryNodeIp);
|
||||
}
|
||||
const others = Array.from(discoveredNodes.values())
|
||||
.filter(n => n.ip !== primaryNodeIp)
|
||||
.sort((a, b) => b.lastSeen - a.lastSeen)
|
||||
.map(n => n.ip);
|
||||
candidateIps.push(...others);
|
||||
|
||||
if (candidateIps.length === 0) {
|
||||
throw new Error('No SPORE nodes discovered');
|
||||
}
|
||||
|
||||
let lastError = null;
|
||||
for (const ip of candidateIps) {
|
||||
try {
|
||||
const client = (sporeClient && ip === primaryNodeIp)
|
||||
? sporeClient
|
||||
: initializeSporeClient(ip);
|
||||
if (!client) {
|
||||
throw new Error(`Failed to initialize client for ${ip}`);
|
||||
}
|
||||
const result = await operation(client, ip);
|
||||
if (ip !== primaryNodeIp) {
|
||||
primaryNodeIp = ip;
|
||||
sporeClient = client;
|
||||
console.log(`Failover: switched primary node to ${ip}`);
|
||||
}
|
||||
return result;
|
||||
} catch (err) {
|
||||
console.warn(`Primary attempt on ${ip} failed: ${err.message}`);
|
||||
lastError = err;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError || new Error('All discovered nodes failed');
|
||||
}
|
||||
|
||||
// Set up periodic tasks
|
||||
setInterval(() => {
|
||||
cleanupStaleNodes();
|
||||
@@ -335,19 +378,19 @@ app.post('/api/discovery/primary/:ip', (req, res) => {
|
||||
// API endpoint to get cluster members
|
||||
app.get('/api/cluster/members', async (req, res) => {
|
||||
try {
|
||||
if (!sporeClient) {
|
||||
if (discoveredNodes.size === 0) {
|
||||
return res.status(503).json({
|
||||
error: 'Service unavailable',
|
||||
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
|
||||
discoveredNodes: Array.from(discoveredNodes.keys())
|
||||
});
|
||||
}
|
||||
|
||||
const members = await sporeClient.getClusterStatus();
|
||||
|
||||
const members = await performWithFailover((client) => client.getClusterStatus());
|
||||
res.json(members);
|
||||
} catch (error) {
|
||||
console.error('Error fetching cluster members:', error);
|
||||
res.status(500).json({
|
||||
res.status(502).json({
|
||||
error: 'Failed to fetch cluster members',
|
||||
message: error.message
|
||||
});
|
||||
@@ -373,19 +416,19 @@ app.get('/api/tasks/status', async (req, res) => {
|
||||
}
|
||||
}
|
||||
|
||||
if (!sporeClient) {
|
||||
if (discoveredNodes.size === 0) {
|
||||
return res.status(503).json({
|
||||
error: 'Service unavailable',
|
||||
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
|
||||
discoveredNodes: Array.from(discoveredNodes.keys())
|
||||
});
|
||||
}
|
||||
|
||||
const taskStatus = await sporeClient.getTaskStatus();
|
||||
|
||||
const taskStatus = await performWithFailover((client) => client.getTaskStatus());
|
||||
res.json(taskStatus);
|
||||
} catch (error) {
|
||||
console.error('Error fetching task status:', error);
|
||||
res.status(500).json({
|
||||
res.status(502).json({
|
||||
error: 'Failed to fetch task status',
|
||||
message: error.message
|
||||
});
|
||||
@@ -395,7 +438,7 @@ app.get('/api/tasks/status', async (req, res) => {
|
||||
// API endpoint to get system status
|
||||
app.get('/api/node/status', async (req, res) => {
|
||||
try {
|
||||
if (!sporeClient) {
|
||||
if (discoveredNodes.size === 0) {
|
||||
return res.status(503).json({
|
||||
error: 'Service unavailable',
|
||||
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
|
||||
@@ -403,11 +446,11 @@ app.get('/api/node/status', async (req, res) => {
|
||||
});
|
||||
}
|
||||
|
||||
const systemStatus = await sporeClient.getSystemStatus();
|
||||
const systemStatus = await performWithFailover((client) => client.getSystemStatus());
|
||||
res.json(systemStatus);
|
||||
} catch (error) {
|
||||
console.error('Error fetching system status:', error);
|
||||
res.status(500).json({
|
||||
res.status(502).json({
|
||||
error: 'Failed to fetch system status',
|
||||
message: error.message
|
||||
});
|
||||
@@ -433,7 +476,7 @@ app.get('/api/capabilities', async (req, res) => {
|
||||
}
|
||||
}
|
||||
|
||||
if (!sporeClient) {
|
||||
if (discoveredNodes.size === 0) {
|
||||
return res.status(503).json({
|
||||
error: 'Service unavailable',
|
||||
message: 'No SPORE nodes discovered yet. Waiting for CLUSTER_DISCOVERY messages...',
|
||||
@@ -441,11 +484,11 @@ app.get('/api/capabilities', async (req, res) => {
|
||||
});
|
||||
}
|
||||
|
||||
const caps = await sporeClient.getCapabilities();
|
||||
const caps = await performWithFailover((client) => client.getCapabilities());
|
||||
return res.json(caps);
|
||||
} catch (error) {
|
||||
console.error('Error fetching capabilities:', error);
|
||||
return res.status(500).json({
|
||||
return res.status(502).json({
|
||||
error: 'Failed to fetch capabilities',
|
||||
message: error.message
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user