From 7b2f600f8c9dbc44324a1d04303a8dd374090c94 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Mon, 27 Oct 2025 07:55:33 +0100 Subject: [PATCH 1/3] feat: update to new cluster protocol --- .cursor/rules/cleancode.mdc | 56 +++++++++++++++++ .cursor/rules/gitflow.mdc | 111 +++++++++++++++++++++++++++++++++ package-lock.json | 119 +++++++++++++++++++++++++++++++++--- server/index.js | 1 + server/udp-discovery.js | 103 ++++++++++++++++++++++++------- 5 files changed, 361 insertions(+), 29 deletions(-) create mode 100644 .cursor/rules/cleancode.mdc create mode 100644 .cursor/rules/gitflow.mdc diff --git a/.cursor/rules/cleancode.mdc b/.cursor/rules/cleancode.mdc new file mode 100644 index 0000000..8b41201 --- /dev/null +++ b/.cursor/rules/cleancode.mdc @@ -0,0 +1,56 @@ +--- +description: Guidelines for writing clean, maintainable, and human-readable code. Apply these rules when writing or reviewing code to ensure consistency and quality. +globs: +alwaysApply: true +--- +# Clean Code Guidelines + +## Constants Over Magic Numbers +- Replace hard-coded values with named constants +- Use descriptive constant names that explain the value's purpose +- Keep constants at the top of the file or in a dedicated constants file + +## Meaningful Names +- Variables, functions, and classes should reveal their purpose +- Names should explain why something exists and how it's used +- Avoid abbreviations unless they're universally understood + +## Smart Comments +- Don't comment on what the code does - make the code self-documenting +- Use comments to explain why something is done a certain way +- Document APIs, complex algorithms, and non-obvious side effects + +## Single Responsibility +- Each function should do exactly one thing +- Functions should be small and focused +- If a function needs a comment to explain what it does, it should be split + +## DRY (Don't Repeat Yourself) +- Extract repeated code into reusable functions +- Share common logic through proper abstraction +- Maintain single sources of truth + +## Clean Structure +- Keep related code together +- Organize code in a logical hierarchy +- Use consistent file and folder naming conventions + +## Encapsulation +- Hide implementation details +- Expose clear interfaces +- Move nested conditionals into well-named functions + +## Code Quality Maintenance +- Refactor continuously +- Fix technical debt early +- Leave code cleaner than you found it + +## Testing +- Write tests before fixing bugs +- Keep tests readable and maintainable +- Test edge cases and error conditions + +## Version Control +- Write clear commit messages +- Make small, focused commits +- Use meaningful branch names \ No newline at end of file diff --git a/.cursor/rules/gitflow.mdc b/.cursor/rules/gitflow.mdc new file mode 100644 index 0000000..d52c71b --- /dev/null +++ b/.cursor/rules/gitflow.mdc @@ -0,0 +1,111 @@ +--- +description: Gitflow Workflow Rules. These rules should be applied when performing git operations. +--- +# Gitflow Workflow Rules + +## Main Branches + +### main (or master) +- Contains production-ready code +- Never commit directly to main +- Only accepts merges from: + - hotfix/* branches + - release/* branches +- Must be tagged with version number after each merge + +### develop +- Main development branch +- Contains latest delivered development changes +- Source branch for feature branches +- Never commit directly to develop + +## Supporting Branches + +### feature/* +- Branch from: develop +- Merge back into: develop +- Naming convention: feature/[issue-id]-descriptive-name +- Example: feature/123-user-authentication +- Must be up-to-date with develop before creating PR +- Delete after merge + +### release/* +- Branch from: develop +- Merge back into: + - main + - develop +- Naming convention: release/vX.Y.Z +- Example: release/v1.2.0 +- Only bug fixes, documentation, and release-oriented tasks +- No new features +- Delete after merge + +### hotfix/* +- Branch from: main +- Merge back into: + - main + - develop +- Naming convention: hotfix/vX.Y.Z +- Example: hotfix/v1.2.1 +- Only for urgent production fixes +- Delete after merge + +## Commit Messages + +- Format: `type(scope): description` +- Types: + - feat: New feature + - fix: Bug fix + - docs: Documentation changes + - style: Formatting, missing semicolons, etc. + - refactor: Code refactoring + - test: Adding tests + - chore: Maintenance tasks + +## Version Control + +### Semantic Versioning +- MAJOR version for incompatible API changes +- MINOR version for backwards-compatible functionality +- PATCH version for backwards-compatible bug fixes + +## Pull Request Rules + +1. All changes must go through Pull Requests +2. Required approvals: minimum 1 +3. CI checks must pass +4. No direct commits to protected branches (main, develop) +5. Branch must be up to date before merging +6. Delete branch after merge + +## Branch Protection Rules + +### main & develop +- Require pull request reviews +- Require status checks to pass +- Require branches to be up to date +- Include administrators in restrictions +- No force pushes +- No deletions + +## Release Process + +1. Create release branch from develop +2. Bump version numbers +3. Fix any release-specific issues +4. Create PR to main +5. After merge to main: + - Tag release + - Merge back to develop + - Delete release branch + +## Hotfix Process + +1. Create hotfix branch from main +2. Fix the issue +3. Bump patch version +4. Create PR to main +5. After merge to main: + - Tag release + - Merge back to develop + - Delete hotfix branch \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index aad9fff..1b8300f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "1.0.0", "license": "MIT", "dependencies": { - "dgram": "^1.0.1", + "axios": "^1.6.0", "express": "^4.18.2", "ws": "^8.14.2" } @@ -33,6 +33,23 @@ "integrity": "sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==", "license": "MIT" }, + "node_modules/asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==", + "license": "MIT" + }, + "node_modules/axios": { + "version": "1.12.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.12.2.tgz", + "integrity": "sha512-vMJzPewAlRyOgxV2dU0Cuz2O8zzzx9VYtbJOaBgXFeLc4IV/Eg50n4LowmehOOR61S8ZMpc2K5Sa7g6A4jfkUw==", + "license": "MIT", + "dependencies": { + "follow-redirects": "^1.15.6", + "form-data": "^4.0.4", + "proxy-from-env": "^1.1.0" + } + }, "node_modules/body-parser": { "version": "1.20.3", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-1.20.3.tgz", @@ -95,6 +112,18 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "license": "MIT", + "dependencies": { + "delayed-stream": "~1.0.0" + }, + "engines": { + "node": ">= 0.8" + } + }, "node_modules/content-disposition": { "version": "0.5.4", "resolved": "https://registry.npmjs.org/content-disposition/-/content-disposition-0.5.4.tgz", @@ -140,6 +169,15 @@ "ms": "2.0.0" } }, + "node_modules/delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==", + "license": "MIT", + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -159,13 +197,6 @@ "npm": "1.2.8000 || >= 1.4.16" } }, - "node_modules/dgram": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/dgram/-/dgram-1.0.1.tgz", - "integrity": "sha512-zJVFL1EWfKtE0z2VN6qfpn/a+qG1viEzcwJA0EjtzS76ONSE3sEyWBwEbo32hS4IFw/EWVuWN+8b89aPW6It2A==", - "deprecated": "npm is holding this package for security reasons. As it's a core Node module, we will not transfer it over to other users. You may safely remove the package from your dependencies.", - "license": "ISC" - }, "node_modules/dunder-proto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/dunder-proto/-/dunder-proto-1.0.1.tgz", @@ -225,6 +256,21 @@ "node": ">= 0.4" } }, + "node_modules/es-set-tostringtag": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/es-set-tostringtag/-/es-set-tostringtag-2.1.0.tgz", + "integrity": "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==", + "license": "MIT", + "dependencies": { + "es-errors": "^1.3.0", + "get-intrinsic": "^1.2.6", + "has-tostringtag": "^1.0.2", + "hasown": "^2.0.2" + }, + "engines": { + "node": ">= 0.4" + } + }, "node_modules/escape-html": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/escape-html/-/escape-html-1.0.3.tgz", @@ -304,6 +350,42 @@ "node": ">= 0.8" } }, + "node_modules/follow-redirects": { + "version": "1.15.11", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.11.tgz", + "integrity": "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/RubenVerborgh" + } + ], + "license": "MIT", + "engines": { + "node": ">=4.0" + }, + "peerDependenciesMeta": { + "debug": { + "optional": true + } + } + }, + "node_modules/form-data": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.4.tgz", + "integrity": "sha512-KrGhL9Q4zjj0kiUt5OO4Mr/A/jlI2jDYs5eHBpYHPcBEVSiipAvn2Ko2HnPe20rmcuuvMHNdZFp+4IlGTMF0Ow==", + "license": "MIT", + "dependencies": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "es-set-tostringtag": "^2.1.0", + "hasown": "^2.0.2", + "mime-types": "^2.1.12" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/forwarded": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/forwarded/-/forwarded-0.2.0.tgz", @@ -392,6 +474,21 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/has-tostringtag": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/has-tostringtag/-/has-tostringtag-1.0.2.tgz", + "integrity": "sha512-NqADB8VjPFLM2V0VvHUewwwsw0ZWBaIdgo+ieHtK3hasLz4qeCRjYcqfB6AQrBggRKppKF8L52/VqdVsO47Dlw==", + "license": "MIT", + "dependencies": { + "has-symbols": "^1.0.3" + }, + "engines": { + "node": ">= 0.4" + }, + "funding": { + "url": "https://github.com/sponsors/ljharb" + } + }, "node_modules/hasown": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/hasown/-/hasown-2.0.2.tgz", @@ -583,6 +680,12 @@ "node": ">= 0.10" } }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==", + "license": "MIT" + }, "node_modules/qs": { "version": "6.13.0", "resolved": "https://registry.npmjs.org/qs/-/qs-6.13.0.tgz", diff --git a/server/index.js b/server/index.js index ebb49cd..6b2e865 100644 --- a/server/index.js +++ b/server/index.js @@ -462,6 +462,7 @@ class LEDLabServer { const frameData = stream.preset.generateFrame(); if (frameData) { // Send to specific node + // frameData format: "RAW:FF0000FF0000..." (RAW prefix + hex pixel data) this.udpDiscovery.sendToNode(nodeIp, frameData); // Send frame data to WebSocket clients for preview diff --git a/server/udp-discovery.js b/server/udp-discovery.js index 0fd56fc..f930c21 100644 --- a/server/udp-discovery.js +++ b/server/udp-discovery.js @@ -91,50 +91,110 @@ class UdpDiscovery extends EventEmitter { return; } - // Update node last seen time + // Handle different message types + if (message.startsWith('cluster/heartbeat:')) { + // Extract hostname from heartbeat: "cluster/heartbeat:hostname" + const hostname = message.substring('cluster/heartbeat:'.length); + this.handleHeartbeat(hostname, nodeIp, rinfo.port); + } else if (message.startsWith('node/update:')) { + // Extract hostname and JSON from update: "node/update:hostname:{json}" + const parts = message.substring('node/update:'.length).split(':'); + if (parts.length >= 2) { + const hostname = parts[0]; + const jsonStr = parts.slice(1).join(':'); // Rejoin in case JSON contains colons + this.handleNodeUpdate(hostname, jsonStr, nodeIp, rinfo.port); + } + } + } + + handleHeartbeat(hostname, nodeIp, port) { + console.log(`Heartbeat from ${hostname} @ ${nodeIp}`); + + // Update or add node + const existingNode = this.nodes.get(nodeIp); this.nodes.set(nodeIp, { lastSeen: Date.now(), status: 'connected', address: nodeIp, - port: rinfo.port + port: port, + hostname: hostname }); - // Emit node discovered/updated event - this.emit('nodeDiscovered', { - ip: nodeIp, - port: rinfo.port, - status: 'connected' - }); + // Only emit if this is a new node or if we need to update + if (!existingNode || existingNode.hostname !== hostname) { + this.emit('nodeDiscovered', { + ip: nodeIp, + hostname: hostname, + port: port, + status: 'connected' + }); + } // Clean up stale nodes periodically this.cleanupStaleNodes(); } - startDiscoveryBroadcast() { - // Broadcast discovery message every 5 seconds - this.discoveryInterval = setInterval(() => { - this.broadcastDiscovery(); - }, 5000); + handleNodeUpdate(hostname, jsonStr, nodeIp, port) { + console.log(`Node update from ${hostname} @ ${nodeIp}`); - // Send initial broadcast - this.broadcastDiscovery(); + // Try to parse JSON to extract additional info + let nodeInfo = {}; + try { + nodeInfo = JSON.parse(jsonStr); + } catch (e) { + console.warn(`Failed to parse node update JSON: ${e.message}`); + } + + // Update node with hostname and any additional info + const existingNode = this.nodes.get(nodeIp); + this.nodes.set(nodeIp, { + lastSeen: Date.now(), + status: 'connected', + address: nodeIp, + port: port, + hostname: hostname || nodeInfo.hostname || existingNode?.hostname, + ...nodeInfo + }); + + // Emit update event + this.emit('nodeDiscovered', { + ip: nodeIp, + hostname: hostname || nodeInfo.hostname || existingNode?.hostname, + port: port, + status: 'connected' + }); } - broadcastDiscovery() { + startDiscoveryBroadcast() { + // With the new protocol, SPORE nodes automatically broadcast heartbeats + // LEDLab passively listens for these heartbeats, so we don't need to broadcast. + // However, we can optionally send a heartbeat to prompt nodes to respond faster. + // For now, we just listen for incoming heartbeats from nodes. + + // Optional: send initial heartbeat to prompt nodes to announce themselves + this.broadcastHeartbeat(); + + // Send periodic heartbeats to prompt node announcements (every 10 seconds) + this.discoveryInterval = setInterval(() => { + this.broadcastHeartbeat(); + }, 10000); + } + + broadcastHeartbeat() { if (!this.socket) { return; } - const discoveryMessage = 'SPORE_DISCOVERY'; + // Send heartbeat using the new protocol format: "cluster/heartbeat:hostname" + const hostname = 'ledlab-client'; + const discoveryMessage = `cluster/heartbeat:${hostname}`; const message = Buffer.from(discoveryMessage, 'utf8'); - // Broadcast to all nodes on the network (broadcast already enabled in bind callback) - this.socket.send(message, 0, message.length, this.port, '255.255.255.255', (err) => { if (err) { - console.error('Error broadcasting discovery message:', err); + console.error('Error broadcasting heartbeat:', err); } else { - console.log('Discovery message broadcasted'); + console.log('Discovery heartbeat broadcasted'); } }); } @@ -154,6 +214,7 @@ class UdpDiscovery extends EventEmitter { getNodes() { const nodes = Array.from(this.nodes.entries()).map(([ip, node]) => ({ ip, + hostname: node.hostname || ip, ...node })); From 858be416eb5fc5b1cdf9b314fd149b65689b7ff3 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Mon, 27 Oct 2025 09:37:20 +0100 Subject: [PATCH 2/3] feat: use gateway --- README.md | 71 ++++-- EDITOR_UPDATE.md => docs/EDITOR_UPDATE.md | 0 .../MULTI_NODE_UPDATE.md | 0 PRESET_EDITOR.md => docs/PRESET_EDITOR.md | 0 public/styles/main.css | 1 - server/gateway-client.js | 160 ++++++++++++ server/index.js | 69 +++--- server/udp-discovery.js | 233 ++---------------- 8 files changed, 279 insertions(+), 255 deletions(-) rename EDITOR_UPDATE.md => docs/EDITOR_UPDATE.md (100%) rename MULTI_NODE_UPDATE.md => docs/MULTI_NODE_UPDATE.md (100%) rename PRESET_EDITOR.md => docs/PRESET_EDITOR.md (100%) create mode 100644 server/gateway-client.js diff --git a/README.md b/README.md index 7d62b65..9dd508b 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,8 @@ LEDLab is a tool for streaming animations to LED matrices connected to SPORE nod The Node.js server provides the backend for SPORE LEDLab: -- **UDP Discovery**: Listens on port 4210 to automatically discover SPORE nodes +- **Gateway Integration**: Queries spore-gateway for node discovery (requires spore-gateway to be running) +- **UDP Frame Streaming**: Sends animation frames to SPORE nodes via UDP on port 4210 - **WebSocket API**: Real-time bidirectional communication with the web UI - **Preset Management**: Manages animation presets with configurable parameters - **Multi-Node Streaming**: Streams different presets to individual nodes simultaneously @@ -50,6 +51,7 @@ Web UI (Browser) <--WebSocket--> Server <--UDP--> SPORE Nodes Preset Engine | Frame Generation (60fps) + Gateway API (Discovery) ``` ## Build @@ -58,6 +60,7 @@ Web UI (Browser) <--WebSocket--> Server <--UDP--> SPORE Nodes - Node.js (v14 or higher) - npm (v6 or higher) +- spore-gateway must be running on port 3001 ### Installation @@ -83,7 +86,8 @@ This will install: spore-ledlab/ ├── server/ # Backend server │ ├── index.js # Main server & WebSocket handling -│ └── udp-discovery.js # UDP node discovery +│ ├── gateway-client.js # Gateway client for node discovery +│ └── udp-discovery.js # UDP sender for frame streaming ├── presets/ # Animation preset implementations │ ├── preset-registry.js │ ├── base-preset.js @@ -117,17 +121,18 @@ npm run dev ``` The server will: -- Start the HTTP server on port 3000 +- Start the HTTP server on port 8080 (default) - Initialize WebSocket server -- Begin UDP discovery on port 4210 -- Serve the web UI at http://localhost:3000 +- Connect to spore-gateway for node discovery +- Initialize UDP sender for frame streaming +- Serve the web UI at http://localhost:8080 ### Access the Web UI Open your browser and navigate to: ``` -http://localhost:3000 +http://localhost:8080 ``` ### Configure SPORE Nodes @@ -136,8 +141,30 @@ Ensure your SPORE nodes are: 1. Connected to the same network as the LEDLab server 2. Running firmware with PixelStreamController support 3. Listening on UDP port 4210 +4. Discovered by spore-gateway +5. Labeled with `app: pixelstream` (LEDLab only displays nodes with this label) -The nodes will automatically appear in the LEDLab grid view once discovered. +The nodes will automatically appear in the LEDLab grid view once they are discovered by spore-gateway and have the `app: pixelstream` label. + +### Node Labeling + +To display nodes in LEDLab, you must set the `app: pixelstream` label on your SPORE nodes. This can be done via the spore-gateway API or from the SPORE node itself. + +**From SPORE Node:** +Nodes should advertise their labels via the UDP heartbeat. The PixelStreamController firmware automatically sets the `app: pixelstream` label. + +**Manual via Gateway:** +You can also set labels manually using the spore-gateway API or spore-ui interface. + +### Disabling Filtering + +To display all nodes without filtering, set the environment variable: + +```bash +FILTER_APP_LABEL= npm start +``` + +Or leave it empty to show all discovered nodes. ## Usage @@ -190,16 +217,26 @@ In the Settings view: ### Server Configuration -Edit `server/index.js` to modify: +Edit `server/index.js` or use environment variables: ```javascript -const PORT = 3000; // HTTP/WebSocket port -const UDP_PORT = 4210; // UDP discovery port -const DEFAULT_FPS = 20; // Default frame rate -const MATRIX_WIDTH = 16; // Default matrix width -const MATRIX_HEIGHT = 16; // Default matrix height +const PORT = 8080; // HTTP/WebSocket port +const UDP_PORT = 4210; // UDP frame streaming port +const GATEWAY_URL = 'http://localhost:3001'; // spore-gateway URL +const FILTER_APP_LABEL = 'pixelstream'; // Filter nodes by app label +const DEFAULT_FPS = 20; // Default frame rate +const MATRIX_WIDTH = 16; // Default matrix width +const MATRIX_HEIGHT = 16; // Default matrix height ``` +Environment variables: +- `PORT` - HTTP server port (default: 8080) +- `UDP_PORT` - UDP port for frame streaming (default: 4210) +- `GATEWAY_URL` - spore-gateway URL (default: http://localhost:3001) +- `FILTER_APP_LABEL` - Filter nodes by app label (default: pixelstream, set to empty string to disable filtering) +- `MATRIX_WIDTH` - Default matrix width (default: 16) +- `MATRIX_HEIGHT` - Default matrix height (default: 16) + ### Adding Custom Presets 1. Create a new preset file in `presets/`: @@ -323,14 +360,17 @@ See `presets/examples/README.md` for detailed documentation. ### Nodes Not Appearing +- Ensure spore-gateway is running on port 3001 - Verify nodes are on the same network - Check firewall settings allow UDP port 4210 - Ensure nodes are running PixelStreamController firmware +- Verify spore-gateway has discovered the nodes +- **IMPORTANT**: Nodes must have the `app: pixelstream` label set. LEDLab only displays nodes with this label. Check node labels in spore-gateway. ### WebSocket Connection Issues - Check browser console for errors -- Verify server is running on port 3000 +- Verify server is running on port 8080 (default) - Try refreshing the browser ### Animation Not Streaming @@ -351,7 +391,8 @@ See `presets/examples/README.md` for detailed documentation. - Node.js - Express.js (HTTP server) - ws (WebSocket server) -- UDP (Node discovery and frame streaming) +- HTTP client (for querying spore-gateway) +- UDP (Frame streaming only) **Frontend:** - Vanilla JavaScript (ES6+) diff --git a/EDITOR_UPDATE.md b/docs/EDITOR_UPDATE.md similarity index 100% rename from EDITOR_UPDATE.md rename to docs/EDITOR_UPDATE.md diff --git a/MULTI_NODE_UPDATE.md b/docs/MULTI_NODE_UPDATE.md similarity index 100% rename from MULTI_NODE_UPDATE.md rename to docs/MULTI_NODE_UPDATE.md diff --git a/PRESET_EDITOR.md b/docs/PRESET_EDITOR.md similarity index 100% rename from PRESET_EDITOR.md rename to docs/PRESET_EDITOR.md diff --git a/public/styles/main.css b/public/styles/main.css index 8ddda33..f64fb0b 100644 --- a/public/styles/main.css +++ b/public/styles/main.css @@ -916,7 +916,6 @@ body { color: var(--text-primary); border: 1px solid var(--border-secondary); border-radius: 6px; - padding: 0.5rem 1rem; font-size: 0.8rem; font-weight: 600; cursor: pointer; diff --git a/server/gateway-client.js b/server/gateway-client.js new file mode 100644 index 0000000..142e0ee --- /dev/null +++ b/server/gateway-client.js @@ -0,0 +1,160 @@ +// Gateway Client - Communicates with spore-gateway for node discovery + +const http = require('http'); + +class GatewayClient { + constructor(options = {}) { + this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001'; + this.pollInterval = options.pollInterval || 2000; // Poll every 2 seconds + this.filterAppLabel = options.filterAppLabel || 'pixelstream'; // Filter nodes by app label, set to null to disable + this.nodes = new Map(); // ip -> { lastSeen, status, hostname, port } + this.isRunning = false; + this.pollTimer = null; + } + + start() { + if (this.isRunning) { + return; + } + + this.isRunning = true; + console.log(`Starting Gateway client, connecting to ${this.gatewayUrl}`); + + // Initial fetch + this.fetchNodes(); + + // Start polling + this.pollTimer = setInterval(() => { + this.fetchNodes(); + }, this.pollInterval); + } + + stop() { + if (!this.isRunning) { + return; + } + + this.isRunning = false; + + if (this.pollTimer) { + clearInterval(this.pollTimer); + this.pollTimer = null; + } + + this.nodes.clear(); + console.log('Gateway client stopped'); + } + + async fetchNodes() { + try { + const response = await this.httpGet(`${this.gatewayUrl}/api/discovery/nodes`); + const data = JSON.parse(response); + + // Update nodes from gateway response + const newNodes = new Map(); + let totalNodes = 0; + let filteredNodes = 0; + + if (data.nodes && Array.isArray(data.nodes)) { + totalNodes = data.nodes.length; + data.nodes.forEach(node => { + // Filter for nodes with specified app label (if filtering is enabled) + if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) { + filteredNodes++; + return; + } + + const nodeIp = node.ip; + newNodes.set(nodeIp, { + lastSeen: Date.now(), + status: node.status || 'active', + hostname: node.hostname || nodeIp, + port: node.port || 4210, + isPrimary: node.isPrimary || false + }); + }); + + //if (totalNodes > 0 && filteredNodes > 0 && this.filterAppLabel) { + // console.loh(`Filtered ${filteredNodes} nodes without app: ${this.filterAppLabel} label (${newNodes.size} ${this.filterAppLabel} nodes active)`); + //} + } + + // Check for newly discovered nodes + for (const [ip, nodeInfo] of newNodes.entries()) { + const existingNode = this.nodes.get(ip); + if (!existingNode) { + console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`); + this.nodes.set(ip, nodeInfo); + // Could emit an event here if needed: this.emit('nodeDiscovered', nodeInfo); + } else if (existingNode.hostname !== nodeInfo.hostname) { + console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`); + this.nodes.set(ip, nodeInfo); + } + } + + // Check for lost nodes + for (const ip of this.nodes.keys()) { + if (!newNodes.has(ip)) { + console.log(`Node lost via gateway: ${ip}`); + this.nodes.delete(ip); + // Could emit an event here if needed: this.emit('nodeLost', { ip }); + } + } + + } catch (error) { + console.error('Error fetching nodes from gateway:', error.message); + } + } + + httpGet(url) { + return new Promise((resolve, reject) => { + http.get(url, (res) => { + let data = ''; + + res.on('data', (chunk) => { + data += chunk; + }); + + res.on('end', () => { + if (res.statusCode === 200) { + resolve(data); + } else { + reject(new Error(`HTTP ${res.statusCode}: ${data}`)); + } + }); + + res.on('error', (err) => { + reject(err); + }); + }).on('error', (err) => { + reject(err); + }); + }); + } + + getNodes() { + return Array.from(this.nodes.entries()).map(([ip, node]) => ({ + ip, + hostname: node.hostname || ip, + port: node.port, + status: node.status, + ...node + })); + } + + getNodeCount() { + return this.nodes.size; + } + + hasAppLabel(node, appLabel) { + // Check if node has the app: label + if (!node.labels || typeof node.labels !== 'object') { + return false; + } + + return node.labels.app === appLabel; + } +} + +module.exports = GatewayClient; + diff --git a/server/index.js b/server/index.js index 6b2e865..3a16a04 100644 --- a/server/index.js +++ b/server/index.js @@ -6,13 +6,16 @@ const WebSocket = require('ws'); const path = require('path'); // Import services -const UdpDiscovery = require('./udp-discovery'); +const UdpSender = require('./udp-discovery'); +const GatewayClient = require('./gateway-client'); const PresetRegistry = require('../presets/preset-registry'); class LEDLabServer { constructor(options = {}) { this.port = options.port || 8080; this.udpPort = options.udpPort || 4210; + this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001'; + this.filterAppLabel = options.filterAppLabel || 'pixelstream'; this.matrixWidth = options.matrixWidth || 16; this.matrixHeight = options.matrixHeight || 16; this.fps = options.fps || 20; @@ -21,7 +24,11 @@ class LEDLabServer { this.server = http.createServer(this.app); this.wss = new WebSocket.Server({ server: this.server }); - this.udpDiscovery = new UdpDiscovery(this.udpPort); + this.udpSender = new UdpSender(this.udpPort); + this.gatewayClient = new GatewayClient({ + gatewayUrl: this.gatewayUrl, + filterAppLabel: this.filterAppLabel || 'pixelstream' + }); this.presetRegistry = new PresetRegistry(); // Legacy single-stream support (kept for backwards compatibility) @@ -37,7 +44,7 @@ class LEDLabServer { this.setupExpress(); this.setupWebSocket(); - this.setupUdpDiscovery(); + this.setupGatewayClient(); this.setupPresetManager(); } @@ -47,7 +54,7 @@ class LEDLabServer { // API routes this.app.get('/api/nodes', (req, res) => { - const nodes = this.udpDiscovery.getNodes(); + const nodes = this.gatewayClient.getNodes(); res.json({ nodes }); }); @@ -61,7 +68,7 @@ class LEDLabServer { streaming: this.currentPreset !== null, currentPreset: this.currentPresetName || null, matrixSize: { width: this.matrixWidth, height: this.matrixHeight }, - nodeCount: this.udpDiscovery.getNodeCount(), + nodeCount: this.gatewayClient.getNodeCount(), currentTarget: this.currentTarget, fps: this.fps, }); @@ -122,7 +129,7 @@ class LEDLabServer { streaming: this.currentPreset !== null, currentPreset: this.currentPresetName || null, matrixSize: { width: this.matrixWidth, height: this.matrixHeight }, - nodes: this.udpDiscovery.getNodes(), + nodes: this.gatewayClient.getNodes(), presetParameters: this.currentPreset ? this.currentPreset.getParameters() : null, currentTarget: this.currentTarget, fps: this.fps, @@ -174,26 +181,14 @@ class LEDLabServer { } } - setupUdpDiscovery() { - this.udpDiscovery.on('nodeDiscovered', (node) => { - console.log('Node discovered:', node.ip); - - this.broadcastToClients({ - type: 'nodeDiscovered', - node - }); - }); - - this.udpDiscovery.on('nodeLost', (node) => { - console.log('Node lost:', node.ip); - - this.broadcastToClients({ - type: 'nodeLost', - node - }); - }); - - this.udpDiscovery.start(); + setupGatewayClient() { + // Start gateway client for node discovery + this.gatewayClient.start(); + + // Start UDP sender for sending frames + this.udpSender.start(); + + console.log('Using gateway for node discovery and UDP sender for frame streaming'); } setupPresetManager() { @@ -441,7 +436,9 @@ class LEDLabServer { if (frameData) { // Send to specific target if (this.currentTarget) { - this.udpDiscovery.sendToNode(this.currentTarget, frameData); + this.udpSender.sendToNode(this.currentTarget, frameData).catch(err => { + console.error(`Error sending frame to ${this.currentTarget}:`, err); + }); } // Send frame data to WebSocket clients for preview @@ -463,7 +460,9 @@ class LEDLabServer { if (frameData) { // Send to specific node // frameData format: "RAW:FF0000FF0000..." (RAW prefix + hex pixel data) - this.udpDiscovery.sendToNode(nodeIp, frameData); + this.udpSender.sendToNode(nodeIp, frameData).catch(err => { + console.error(`Error sending frame to ${nodeIp}:`, err); + }); // Send frame data to WebSocket clients for preview this.broadcastToClients({ @@ -476,7 +475,7 @@ class LEDLabServer { } sendToSpecificNode(nodeIp, message) { - return this.udpDiscovery.sendToNode(nodeIp, message); + return this.udpSender.sendToNode(nodeIp, message); } broadcastCurrentState() { @@ -484,7 +483,7 @@ class LEDLabServer { streaming: this.currentPreset !== null, currentPreset: this.currentPresetName || null, matrixSize: { width: this.matrixWidth, height: this.matrixHeight }, - nodes: this.udpDiscovery.getNodes(), + nodes: this.gatewayClient.getNodes(), presetParameters: this.currentPreset ? this.currentPreset.getParameters() : null, currentTarget: this.currentTarget, fps: this.fps, @@ -604,7 +603,8 @@ class LEDLabServer { startServer() { this.server.listen(this.port, () => { console.log(`LEDLab server running on port ${this.port}`); - console.log(`UDP discovery on port ${this.udpPort}`); + console.log(`Gateway client connecting to ${this.gatewayUrl}`); + console.log(`UDP sender configured for port ${this.udpPort}`); console.log(`Matrix size: ${this.matrixWidth}x${this.matrixHeight}`); }); } @@ -615,8 +615,9 @@ class LEDLabServer { // Stop streaming first this.stopStreaming(); - // Stop UDP discovery - this.udpDiscovery.stop(); + // Stop gateway client and UDP sender + this.gatewayClient.stop(); + this.udpSender.stop(); // Close all WebSocket connections immediately this.wss.close(); @@ -646,6 +647,8 @@ if (require.main === module) { const server = new LEDLabServer({ port: process.env.PORT || 8080, udpPort: process.env.UDP_PORT || 4210, + gatewayUrl: process.env.GATEWAY_URL || 'http://localhost:3001', + filterAppLabel: process.env.FILTER_APP_LABEL || 'pixelstream', matrixWidth: parseInt(process.env.MATRIX_WIDTH) || 16, matrixHeight: parseInt(process.env.MATRIX_HEIGHT) || 16, }); diff --git a/server/udp-discovery.js b/server/udp-discovery.js index f930c21..567c3ed 100644 --- a/server/udp-discovery.js +++ b/server/udp-discovery.js @@ -1,35 +1,12 @@ -// UDP Discovery service for SPORE nodes +// UDP Sender service for sending frames to SPORE nodes const dgram = require('dgram'); -const EventEmitter = require('events'); -const os = require('os'); -class UdpDiscovery extends EventEmitter { +class UdpSender { constructor(port = 4210) { - super(); this.port = port; this.socket = null; - this.nodes = new Map(); // ip -> { lastSeen, status } - this.discoveryInterval = null; this.isRunning = false; - - // Get local network interfaces to filter out local server - this.localInterfaces = this.getLocalInterfaces(); - } - - getLocalInterfaces() { - const interfaces = os.networkInterfaces(); - const localIPs = new Set(); - - Object.values(interfaces).forEach(iface => { - iface.forEach(addr => { - if (addr.family === 'IPv4' && !addr.internal) { - localIPs.add(addr.address); - } - }); - }); - - return localIPs; } start() { @@ -40,24 +17,15 @@ class UdpDiscovery extends EventEmitter { this.socket = dgram.createSocket('udp4'); this.isRunning = true; - this.socket.on('message', (msg, rinfo) => { - this.handleMessage(msg, rinfo); - }); - this.socket.on('error', (err) => { - console.error('UDP Discovery socket error:', err); - this.emit('error', err); + console.error('UDP Sender socket error:', err); }); - this.socket.bind(this.port, () => { - console.log(`UDP Discovery listening on port ${this.port}`); - // Enable broadcast after binding + this.socket.bind(0, () => { + // Bind to any available port this.socket.setBroadcast(true); - this.emit('started'); + console.log(`UDP Sender ready on port ${this.socket.address().port}`); }); - - // Start periodic discovery broadcast - this.startDiscoveryBroadcast(); } stop() { @@ -67,162 +35,12 @@ class UdpDiscovery extends EventEmitter { this.isRunning = false; - if (this.discoveryInterval) { - clearInterval(this.discoveryInterval); - this.discoveryInterval = null; - } - if (this.socket) { this.socket.close(); this.socket = null; } - this.nodes.clear(); - console.log('UDP Discovery stopped'); - this.emit('stopped'); - } - - handleMessage(msg, rinfo) { - const message = msg.toString('utf8'); - const nodeIp = rinfo.address; - - // Skip local server IPs - if (this.localInterfaces.has(nodeIp)) { - return; - } - - // Handle different message types - if (message.startsWith('cluster/heartbeat:')) { - // Extract hostname from heartbeat: "cluster/heartbeat:hostname" - const hostname = message.substring('cluster/heartbeat:'.length); - this.handleHeartbeat(hostname, nodeIp, rinfo.port); - } else if (message.startsWith('node/update:')) { - // Extract hostname and JSON from update: "node/update:hostname:{json}" - const parts = message.substring('node/update:'.length).split(':'); - if (parts.length >= 2) { - const hostname = parts[0]; - const jsonStr = parts.slice(1).join(':'); // Rejoin in case JSON contains colons - this.handleNodeUpdate(hostname, jsonStr, nodeIp, rinfo.port); - } - } - } - - handleHeartbeat(hostname, nodeIp, port) { - console.log(`Heartbeat from ${hostname} @ ${nodeIp}`); - - // Update or add node - const existingNode = this.nodes.get(nodeIp); - this.nodes.set(nodeIp, { - lastSeen: Date.now(), - status: 'connected', - address: nodeIp, - port: port, - hostname: hostname - }); - - // Only emit if this is a new node or if we need to update - if (!existingNode || existingNode.hostname !== hostname) { - this.emit('nodeDiscovered', { - ip: nodeIp, - hostname: hostname, - port: port, - status: 'connected' - }); - } - - // Clean up stale nodes periodically - this.cleanupStaleNodes(); - } - - handleNodeUpdate(hostname, jsonStr, nodeIp, port) { - console.log(`Node update from ${hostname} @ ${nodeIp}`); - - // Try to parse JSON to extract additional info - let nodeInfo = {}; - try { - nodeInfo = JSON.parse(jsonStr); - } catch (e) { - console.warn(`Failed to parse node update JSON: ${e.message}`); - } - - // Update node with hostname and any additional info - const existingNode = this.nodes.get(nodeIp); - this.nodes.set(nodeIp, { - lastSeen: Date.now(), - status: 'connected', - address: nodeIp, - port: port, - hostname: hostname || nodeInfo.hostname || existingNode?.hostname, - ...nodeInfo - }); - - // Emit update event - this.emit('nodeDiscovered', { - ip: nodeIp, - hostname: hostname || nodeInfo.hostname || existingNode?.hostname, - port: port, - status: 'connected' - }); - } - - startDiscoveryBroadcast() { - // With the new protocol, SPORE nodes automatically broadcast heartbeats - // LEDLab passively listens for these heartbeats, so we don't need to broadcast. - // However, we can optionally send a heartbeat to prompt nodes to respond faster. - // For now, we just listen for incoming heartbeats from nodes. - - // Optional: send initial heartbeat to prompt nodes to announce themselves - this.broadcastHeartbeat(); - - // Send periodic heartbeats to prompt node announcements (every 10 seconds) - this.discoveryInterval = setInterval(() => { - this.broadcastHeartbeat(); - }, 10000); - } - - broadcastHeartbeat() { - if (!this.socket) { - return; - } - - // Send heartbeat using the new protocol format: "cluster/heartbeat:hostname" - const hostname = 'ledlab-client'; - const discoveryMessage = `cluster/heartbeat:${hostname}`; - const message = Buffer.from(discoveryMessage, 'utf8'); - - this.socket.send(message, 0, message.length, this.port, '255.255.255.255', (err) => { - if (err) { - console.error('Error broadcasting heartbeat:', err); - } else { - console.log('Discovery heartbeat broadcasted'); - } - }); - } - - cleanupStaleNodes() { - const now = Date.now(); - const staleThreshold = 10000; // 10 seconds - - for (const [ip, node] of this.nodes.entries()) { - if (now - node.lastSeen > staleThreshold) { - this.nodes.delete(ip); - this.emit('nodeLost', { ip, status: 'disconnected' }); - } - } - } - - getNodes() { - const nodes = Array.from(this.nodes.entries()).map(([ip, node]) => ({ - ip, - hostname: node.hostname || ip, - ...node - })); - - return nodes; - } - - getNodeCount() { - return this.nodes.size; + console.log('UDP Sender stopped'); } sendToNode(nodeIp, message) { @@ -231,15 +49,17 @@ class UdpDiscovery extends EventEmitter { } const buffer = Buffer.from(message, 'utf8'); - this.socket.send(buffer, 0, buffer.length, this.port, nodeIp, (err) => { - if (err) { - console.error(`Error sending to node ${nodeIp}:`, err); - return false; - } - return true; + + return new Promise((resolve, reject) => { + this.socket.send(buffer, 0, buffer.length, this.port, nodeIp, (err) => { + if (err) { + console.error(`Error sending to node ${nodeIp}:`, err); + reject(err); + return; + } + resolve(true); + }); }); - - return true; } broadcastToAll(message) { @@ -250,16 +70,17 @@ class UdpDiscovery extends EventEmitter { const buffer = Buffer.from(message, 'utf8'); this.socket.setBroadcast(true); - this.socket.send(buffer, 0, buffer.length, this.port, '255.255.255.255', (err) => { - if (err) { - console.error('Error broadcasting message:', err); - return false; - } - return true; + return new Promise((resolve, reject) => { + this.socket.send(buffer, 0, buffer.length, this.port, '255.255.255.255', (err) => { + if (err) { + console.error('Error broadcasting message:', err); + reject(err); + return; + } + resolve(true); + }); }); - - return true; } } -module.exports = UdpDiscovery; +module.exports = UdpSender; From 778436536141eacfe365e1fc69912d5109414f08 Mon Sep 17 00:00:00 2001 From: 0x1d Date: Mon, 27 Oct 2025 13:05:35 +0100 Subject: [PATCH 3/3] feat: subscribe to websocket for cluster updates --- README.md | 4 +- server/gateway-client.js | 214 +++++++++++++++++++++++---------------- 2 files changed, 130 insertions(+), 88 deletions(-) diff --git a/README.md b/README.md index 9dd508b..cae0d39 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ LEDLab is a tool for streaming animations to LED matrices connected to SPORE nod The Node.js server provides the backend for SPORE LEDLab: -- **Gateway Integration**: Queries spore-gateway for node discovery (requires spore-gateway to be running) +- **Gateway Integration**: Subscribes to spore-gateway WebSocket for real-time cluster updates (requires spore-gateway to be running) - **UDP Frame Streaming**: Sends animation frames to SPORE nodes via UDP on port 4210 - **WebSocket API**: Real-time bidirectional communication with the web UI - **Preset Management**: Manages animation presets with configurable parameters @@ -51,7 +51,7 @@ Web UI (Browser) <--WebSocket--> Server <--UDP--> SPORE Nodes Preset Engine | Frame Generation (60fps) - Gateway API (Discovery) + Gateway WebSocket (Real-time cluster updates) ``` ## Build diff --git a/server/gateway-client.js b/server/gateway-client.js index 142e0ee..733a6eb 100644 --- a/server/gateway-client.js +++ b/server/gateway-client.js @@ -1,15 +1,18 @@ // Gateway Client - Communicates with spore-gateway for node discovery -const http = require('http'); +const WebSocket = require('ws'); class GatewayClient { constructor(options = {}) { this.gatewayUrl = options.gatewayUrl || 'http://localhost:3001'; - this.pollInterval = options.pollInterval || 2000; // Poll every 2 seconds this.filterAppLabel = options.filterAppLabel || 'pixelstream'; // Filter nodes by app label, set to null to disable this.nodes = new Map(); // ip -> { lastSeen, status, hostname, port } this.isRunning = false; - this.pollTimer = null; + this.ws = null; + this.reconnectAttempts = 0; + this.maxReconnectAttempts = 10; + this.reconnectDelay = 2000; + this.reconnectTimer = null; } start() { @@ -20,13 +23,7 @@ class GatewayClient { this.isRunning = true; console.log(`Starting Gateway client, connecting to ${this.gatewayUrl}`); - // Initial fetch - this.fetchNodes(); - - // Start polling - this.pollTimer = setInterval(() => { - this.fetchNodes(); - }, this.pollInterval); + this.connectWebSocket(); } stop() { @@ -36,100 +33,143 @@ class GatewayClient { this.isRunning = false; - if (this.pollTimer) { - clearInterval(this.pollTimer); - this.pollTimer = null; + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + if (this.ws) { + this.ws.close(); + this.ws = null; } this.nodes.clear(); console.log('Gateway client stopped'); } - async fetchNodes() { + connectWebSocket() { try { - const response = await this.httpGet(`${this.gatewayUrl}/api/discovery/nodes`); - const data = JSON.parse(response); + // Convert http:// to ws:// for WebSocket + const wsUrl = this.gatewayUrl.replace('http://', 'ws://').replace('https://', 'wss://') + '/ws'; + console.log(`Connecting to WebSocket: ${wsUrl}`); - // Update nodes from gateway response - const newNodes = new Map(); - let totalNodes = 0; - let filteredNodes = 0; + this.ws = new WebSocket(wsUrl); - if (data.nodes && Array.isArray(data.nodes)) { - totalNodes = data.nodes.length; - data.nodes.forEach(node => { - // Filter for nodes with specified app label (if filtering is enabled) - if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) { - filteredNodes++; - return; - } - - const nodeIp = node.ip; - newNodes.set(nodeIp, { - lastSeen: Date.now(), - status: node.status || 'active', - hostname: node.hostname || nodeIp, - port: node.port || 4210, - isPrimary: node.isPrimary || false - }); - }); - - //if (totalNodes > 0 && filteredNodes > 0 && this.filterAppLabel) { - // console.loh(`Filtered ${filteredNodes} nodes without app: ${this.filterAppLabel} label (${newNodes.size} ${this.filterAppLabel} nodes active)`); - //} - } - - // Check for newly discovered nodes - for (const [ip, nodeInfo] of newNodes.entries()) { - const existingNode = this.nodes.get(ip); - if (!existingNode) { - console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`); - this.nodes.set(ip, nodeInfo); - // Could emit an event here if needed: this.emit('nodeDiscovered', nodeInfo); - } else if (existingNode.hostname !== nodeInfo.hostname) { - console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`); - this.nodes.set(ip, nodeInfo); + this.ws.on('open', () => { + console.log('WebSocket connected to gateway'); + this.reconnectAttempts = 0; + }); + + this.ws.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + this.handleWebSocketMessage(message); + } catch (error) { + console.error('Error parsing WebSocket message:', error); } - } - - // Check for lost nodes - for (const ip of this.nodes.keys()) { - if (!newNodes.has(ip)) { - console.log(`Node lost via gateway: ${ip}`); - this.nodes.delete(ip); - // Could emit an event here if needed: this.emit('nodeLost', { ip }); + }); + + this.ws.on('close', (code, reason) => { + console.log(`WebSocket connection closed: ${code} ${reason}`); + if (this.isRunning) { + this.scheduleReconnect(); } - } - + }); + + this.ws.on('error', (error) => { + console.error('WebSocket error:', error.message); + }); + } catch (error) { - console.error('Error fetching nodes from gateway:', error.message); + console.error('Failed to create WebSocket connection:', error); + this.scheduleReconnect(); } } - httpGet(url) { - return new Promise((resolve, reject) => { - http.get(url, (res) => { - let data = ''; + handleWebSocketMessage(message) { + if (message.topic === 'cluster/update') { + this.handleClusterUpdate(message.members, message.primaryNode, message.totalNodes); + } else if (message.topic === 'node/discovery') { + this.handleNodeDiscovery(message.action, message.nodeIp); + } + } + + handleClusterUpdate(members, primaryNode, totalNodes) { + const newNodes = new Map(); + + if (members && Array.isArray(members)) { + members.forEach(node => { + // Filter for nodes with specified app label (if filtering is enabled) + if (this.filterAppLabel && !this.hasAppLabel(node, this.filterAppLabel)) { + return; + } - res.on('data', (chunk) => { - data += chunk; + const nodeIp = node.ip || node.IP; + newNodes.set(nodeIp, { + lastSeen: Date.now(), + status: node.status || node.Status || 'active', + hostname: node.hostname || node.Hostname || nodeIp, + port: node.port || node.Port || 4210, + isPrimary: (primaryNode === nodeIp), + labels: node.labels || node.Labels }); - - res.on('end', () => { - if (res.statusCode === 200) { - resolve(data); - } else { - reject(new Error(`HTTP ${res.statusCode}: ${data}`)); - } - }); - - res.on('error', (err) => { - reject(err); - }); - }).on('error', (err) => { - reject(err); }); - }); + } + + // Check for newly discovered nodes + for (const [ip, nodeInfo] of newNodes.entries()) { + const existingNode = this.nodes.get(ip); + if (!existingNode) { + console.log(`Node discovered via gateway: ${ip} (${nodeInfo.hostname})`); + this.nodes.set(ip, nodeInfo); + } else if (existingNode.hostname !== nodeInfo.hostname) { + console.log(`Node hostname updated: ${ip} -> ${nodeInfo.hostname}`); + this.nodes.set(ip, nodeInfo); + } else { + // Update status and last seen + existingNode.status = nodeInfo.status; + existingNode.lastSeen = nodeInfo.lastSeen; + existingNode.isPrimary = nodeInfo.isPrimary; + } + } + + // Check for lost nodes + for (const ip of this.nodes.keys()) { + if (!newNodes.has(ip)) { + console.log(`Node lost via gateway: ${ip}`); + this.nodes.delete(ip); + } + } + } + + handleNodeDiscovery(action, nodeIp) { + if (action === 'discovered') { + // Node was discovered - actual data will come via cluster/update + console.log(`Node discovered event: ${nodeIp}`); + } else if (action === 'removed') { + console.log(`Node removed event: ${nodeIp}`); + this.nodes.delete(nodeIp); + } + } + + scheduleReconnect() { + if (!this.isRunning) { + return; + } + + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + console.error('Max WebSocket reconnection attempts reached'); + return; + } + + this.reconnectAttempts++; + const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); + + console.log(`Attempting to reconnect WebSocket in ${delay}ms (attempt ${this.reconnectAttempts})`); + + this.reconnectTimer = setTimeout(() => { + this.connectWebSocket(); + }, delay); } getNodes() { @@ -138,6 +178,7 @@ class GatewayClient { hostname: node.hostname || ip, port: node.port, status: node.status, + isPrimary: node.isPrimary, ...node })); } @@ -158,3 +199,4 @@ class GatewayClient { module.exports = GatewayClient; +