From f3d99b174f99310604f1c0a16595e2f1475303d5 Mon Sep 17 00:00:00 2001 From: Patrick Balsiger Date: Wed, 1 Oct 2025 21:47:27 +0200 Subject: [PATCH] feat: udp stream --- .../pixelstream/PixelStreamController.cpp | 127 ++++++++++++++++++ examples/pixelstream/PixelStreamController.h | 42 ++++++ examples/pixelstream/README.md | 33 +++++ examples/pixelstream/main.cpp | 60 +++++++++ include/spore/core/ClusterManager.h | 2 + include/spore/internal/Globals.h | 1 + platformio.ini | 21 +++ src/spore/core/ApiServer.cpp | 5 + src/spore/core/ClusterManager.cpp | 27 ++++ test/package.json | 5 + test/pixelstream/bouncing-ball.js | 80 +++++++++++ test/pixelstream/fade-green-blue.js | 55 ++++++++ test/pixelstream/rainbow.js | 59 ++++++++ 13 files changed, 517 insertions(+) create mode 100644 examples/pixelstream/PixelStreamController.cpp create mode 100644 examples/pixelstream/PixelStreamController.h create mode 100644 examples/pixelstream/README.md create mode 100644 examples/pixelstream/main.cpp create mode 100644 test/pixelstream/bouncing-ball.js create mode 100644 test/pixelstream/fade-green-blue.js create mode 100644 test/pixelstream/rainbow.js diff --git a/examples/pixelstream/PixelStreamController.cpp b/examples/pixelstream/PixelStreamController.cpp new file mode 100644 index 0000000..46fdb7f --- /dev/null +++ b/examples/pixelstream/PixelStreamController.cpp @@ -0,0 +1,127 @@ +#include "PixelStreamController.h" + +namespace { + constexpr int COMPONENTS_PER_PIXEL = 3; +} + +PixelStreamController::PixelStreamController(NodeContext& ctxRef, const PixelStreamConfig& cfg) + : ctx(ctxRef), config(cfg), pixels(cfg.pixelCount, cfg.pin, cfg.pixelType) { +} + +void PixelStreamController::begin() { + pixels.begin(); + pixels.setBrightness(config.brightness); + // Default all pixels to green so we can verify hardware before streaming frames + for (uint16_t i = 0; i < config.pixelCount; ++i) { + pixels.setPixelColor(i, pixels.Color(0, 255, 0)); + } + pixels.show(); + + ctx.on("udp/raw", [this](void* data) { + this->handleEvent(data); + }); + + LOG_INFO("PixelStream", String("PixelStreamController ready on pin ") + String(config.pin) + " with " + String(config.pixelCount) + " pixels"); +} + +void PixelStreamController::handleEvent(void* data) { + if (data == nullptr) { + return; + } + + String* payload = static_cast(data); + if (!payload) { + return; + } + + if (!applyFrame(*payload)) { + LOG_WARN("PixelStream", String("Ignoring RAW payload with invalid length (") + String(payload->length()) + ")"); + } +} + +bool PixelStreamController::applyFrame(const String& payload) { + static constexpr std::size_t frameWidth = COMPONENTS_PER_PIXEL * 2; + const std::size_t payloadLength = static_cast(payload.length()); + + if (payloadLength == 0 || (payloadLength % frameWidth) != 0) { + LOG_WARN("PixelStream", String("Payload size ") + String(payloadLength) + " is not a multiple of " + String(frameWidth)); + return false; + } + + const uint16_t framesProvided = static_cast(payloadLength / frameWidth); + const uint16_t pixelsToUpdate = std::min(config.pixelCount, framesProvided); + + for (uint16_t index = 0; index < pixelsToUpdate; ++index) { + const std::size_t base = static_cast(index) * frameWidth; + FrameComponents components{}; + if (!tryParsePixel(payload, base, components)) { + LOG_WARN("PixelStream", String("Invalid hex data at pixel index ") + String(index)); + return false; + } + const uint16_t hardwareIndex = mapPixelIndex(index); + pixels.setPixelColor(hardwareIndex, pixels.Color(components.red, components.green, components.blue)); + } + + // Clear any remaining pixels so stale data is removed when fewer frames are provided + for (uint16_t index = pixelsToUpdate; index < config.pixelCount; ++index) { + const uint16_t hardwareIndex = mapPixelIndex(index); + pixels.setPixelColor(hardwareIndex, 0); + } + + pixels.show(); + return true; +} + +uint16_t PixelStreamController::mapPixelIndex(uint16_t logicalIndex) const { + if (config.matrixWidth == 0) { + return logicalIndex; + } + + const uint16_t row = logicalIndex / config.matrixWidth; + const uint16_t col = logicalIndex % config.matrixWidth; + + if (!config.matrixSerpentine || (row % 2 == 0)) { + return row * config.matrixWidth + col; + } + + const uint16_t reversedCol = (config.matrixWidth - 1) - col; + return row * config.matrixWidth + reversedCol; +} + +int PixelStreamController::hexToNibble(char c) { + if (c >= '0' && c <= '9') { + return c - '0'; + } + if (c >= 'a' && c <= 'f') { + return 10 + c - 'a'; + } + if (c >= 'A' && c <= 'F') { + return 10 + c - 'A'; + } + return -1; +} + +bool PixelStreamController::tryParsePixel(const String& payload, std::size_t startIndex, FrameComponents& components) const { + static constexpr std::size_t frameWidth = COMPONENTS_PER_PIXEL * 2; + if (startIndex + frameWidth > static_cast(payload.length())) { + return false; + } + + const int rHi = hexToNibble(payload[startIndex]); + const int rLo = hexToNibble(payload[startIndex + 1]); + const int gHi = hexToNibble(payload[startIndex + 2]); + const int gLo = hexToNibble(payload[startIndex + 3]); + const int bHi = hexToNibble(payload[startIndex + 4]); + const int bLo = hexToNibble(payload[startIndex + 5]); + + if (rHi < 0 || rLo < 0 || gHi < 0 || gLo < 0 || bHi < 0 || bLo < 0) { + return false; + } + + components.red = static_cast((rHi << 4) | rLo); + components.green = static_cast((gHi << 4) | gLo); + components.blue = static_cast((bHi << 4) | bLo); + return true; +} + + diff --git a/examples/pixelstream/PixelStreamController.h b/examples/pixelstream/PixelStreamController.h new file mode 100644 index 0000000..cf1fb5c --- /dev/null +++ b/examples/pixelstream/PixelStreamController.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include +#include +#include "spore/core/NodeContext.h" +#include "spore/util/Logging.h" + +struct PixelStreamConfig { + uint8_t pin; + uint16_t pixelCount; + uint8_t brightness; + uint16_t matrixWidth; + bool matrixSerpentine; + neoPixelType pixelType; +}; + +class PixelStreamController { +public: + PixelStreamController(NodeContext& ctx, const PixelStreamConfig& config); + void begin(); + +private: + struct FrameComponents { + uint8_t red; + uint8_t green; + uint8_t blue; + }; + + bool tryParsePixel(const String& payload, std::size_t startIndex, FrameComponents& components) const; + void handleEvent(void* data); + bool applyFrame(const String& payload); + uint16_t mapPixelIndex(uint16_t logicalIndex) const; + static int hexToNibble(char c); + + NodeContext& ctx; + PixelStreamConfig config; + Adafruit_NeoPixel pixels; +}; + + diff --git a/examples/pixelstream/README.md b/examples/pixelstream/README.md new file mode 100644 index 0000000..6a482c4 --- /dev/null +++ b/examples/pixelstream/README.md @@ -0,0 +1,33 @@ +# PixelStream Example + +This example demonstrates how to consume the `udp/raw` cluster event and drive a NeoPixel strip or matrix directly from streamed RGB data. Frames are provided as hex encoded byte triplets (`RRGGBB` per pixel). + +## Features + +- Subscribes to `udp/raw` via `NodeContext::on`. +- Converts incoming frames into pixel colors for strips or matrices. +- Supports serpentine (zig-zag) matrix wiring. + +## Payload Format + +Each packet is expected to be `RAW:` followed by `pixelCount * 3 * 2` hexadecimal characters. For example, for 8 pixels: + +``` +RAW:FF0000FF0000FF0000FF0000FF0000FF0000FF0000FF0000FF0000 +``` + +## Usage + +### Strip Mode + +Upload the example with `PIXEL_MATRIX_WIDTH` set to 0 (default). Send frames containing `PIXEL_COUNT * 3` bytes as hex. + +### Matrix Mode + +Set `PIXEL_MATRIX_WIDTH` to the number of columns. The controller remaps even/odd rows to support serpentine wiring. + +## Configuration + +Adjust `PIXEL_PIN`, `PIXEL_COUNT`, `PIXEL_BRIGHTNESS`, `PIXEL_MATRIX_WIDTH`, `PIXEL_MATRIX_SERPENTINE`, and `PIXEL_TYPE` through build defines or editing `main.cpp`. + + diff --git a/examples/pixelstream/main.cpp b/examples/pixelstream/main.cpp new file mode 100644 index 0000000..1685d2e --- /dev/null +++ b/examples/pixelstream/main.cpp @@ -0,0 +1,60 @@ +#include +#include "spore/Spore.h" +#include "spore/util/Logging.h" +#include "PixelStreamController.h" + +#ifndef PIXEL_PIN +#define PIXEL_PIN 2 +#endif + +#ifndef PIXEL_COUNT +#define PIXEL_COUNT 64 +#endif + +#ifndef PIXEL_BRIGHTNESS +#define PIXEL_BRIGHTNESS 80 +#endif + +#ifndef PIXEL_MATRIX_WIDTH +#define PIXEL_MATRIX_WIDTH 0 +#endif + +#ifndef PIXEL_MATRIX_SERPENTINE +#define PIXEL_MATRIX_SERPENTINE 1 +#endif + +#ifndef PIXEL_TYPE +#define PIXEL_TYPE NEO_GRB + NEO_KHZ800 +#endif + +Spore spore({ + {"app", "pixelstream"}, + {"role", "led"}, + {"pixels", String(PIXEL_COUNT)} +}); + +PixelStreamController* controller = nullptr; + +void setup() { + spore.setup(); + + PixelStreamConfig config{ + static_cast(PIXEL_PIN), + static_cast(PIXEL_COUNT), + static_cast(PIXEL_BRIGHTNESS), + static_cast(PIXEL_MATRIX_WIDTH), + static_cast(PIXEL_MATRIX_SERPENTINE), + static_cast(PIXEL_TYPE) + }; + + controller = new PixelStreamController(spore.getContext(), config); + controller->begin(); + + spore.begin(); +} + +void loop() { + spore.loop(); +} + + diff --git a/include/spore/core/ClusterManager.h b/include/spore/core/ClusterManager.h index ca4095c..c742a92 100644 --- a/include/spore/core/ClusterManager.h +++ b/include/spore/core/ClusterManager.h @@ -40,11 +40,13 @@ private: static bool isResponseMsg(const char* msg); static bool isNodeInfoMsg(const char* msg); static bool isClusterEventMsg(const char* msg); + static bool isRawMsg(const char* msg); void onDiscovery(const char* msg); void onHeartbeat(const char* msg); void onResponse(const char* msg); void onNodeInfo(const char* msg); void onClusterEvent(const char* msg); + void onRawMessage(const char* msg); unsigned long lastHeartbeatSentAt = 0; std::vector messageHandlers; }; diff --git a/include/spore/internal/Globals.h b/include/spore/internal/Globals.h index 690f2e8..ebb0fc8 100644 --- a/include/spore/internal/Globals.h +++ b/include/spore/internal/Globals.h @@ -10,6 +10,7 @@ namespace ClusterProtocol { constexpr const char* HEARTBEAT_MSG = "CLUSTER_HEARTBEAT"; constexpr const char* NODE_INFO_MSG = "CLUSTER_NODE_INFO"; constexpr const char* CLUSTER_EVENT_MSG = "CLUSTER_EVENT"; + constexpr const char* RAW_MSG = "RAW"; constexpr uint16_t UDP_PORT = 4210; // Increased buffer to accommodate node info JSON over UDP constexpr size_t UDP_BUF_SIZE = 512; diff --git a/platformio.ini b/platformio.ini index b8f0dc8..ac9cce8 100644 --- a/platformio.ini +++ b/platformio.ini @@ -100,3 +100,24 @@ build_src_filter = + + + + +[env:pixelstream] +platform = platformio/espressif8266@^4.2.1 +board = esp01_1m +framework = arduino +upload_speed = 115200 +monitor_speed = 115200 +board_build.filesystem = littlefs +board_build.flash_mode = dout +board_build.ldscript = eagle.flash.1m64.ld +lib_deps = ${common.lib_deps} + adafruit/Adafruit NeoPixel@^1.15.1 +build_flags = +build_src_filter = + + + + + + + + + + + + + + diff --git a/src/spore/core/ApiServer.cpp b/src/spore/core/ApiServer.cpp index 767ceb6..ae95a4e 100644 --- a/src/spore/core/ApiServer.cpp +++ b/src/spore/core/ApiServer.cpp @@ -152,6 +152,11 @@ void ApiServer::setupWebSocket() { // Subscribe to all local events and forward to websocket clients ctx.onAny([this](const std::string& event, void* dataPtr) { + // Ignore raw UDP frames + if (event == "udp/raw") { + return; + } + String* payloadStrPtr = static_cast(dataPtr); String payloadStr = payloadStrPtr ? *payloadStrPtr : String(""); diff --git a/src/spore/core/ClusterManager.cpp b/src/spore/core/ClusterManager.cpp index 5d7999b..2e2bd59 100644 --- a/src/spore/core/ClusterManager.cpp +++ b/src/spore/core/ClusterManager.cpp @@ -68,6 +68,7 @@ void ClusterManager::listen() { void ClusterManager::initMessageHandlers() { messageHandlers.clear(); + messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" }); messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" }); messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" }); messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" }); @@ -113,6 +114,15 @@ bool ClusterManager::isClusterEventMsg(const char* msg) { return strncmp(msg, ClusterProtocol::CLUSTER_EVENT_MSG, strlen(ClusterProtocol::CLUSTER_EVENT_MSG)) == 0; } +bool ClusterManager::isRawMsg(const char* msg) { + // RAW frames must be "RAW:"; enforce the delimiter so we skip things like "RAW_HEARTBEAT". + const std::size_t prefixLen = strlen(ClusterProtocol::RAW_MSG); + if (strncmp(msg, ClusterProtocol::RAW_MSG, prefixLen) != 0) { + return false; + } + return msg[prefixLen] == ':'; +} + void ClusterManager::onDiscovery(const char* /*msg*/) { ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port); String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname; @@ -240,6 +250,23 @@ void ClusterManager::onClusterEvent(const char* msg) { ctx.fire(eventKey, &data); } +void ClusterManager::onRawMessage(const char* msg) { + const std::size_t prefixLen = strlen(ClusterProtocol::RAW_MSG); + if (msg[prefixLen] != ':') { + LOG_WARN("Cluster", "RAW message received without payload delimiter"); + return; + } + + const char* payloadStart = msg + prefixLen + 1; + if (*payloadStart == '\0') { + LOG_WARN("Cluster", "RAW message received with empty payload"); + return; + } + + String payload(payloadStart); + ctx.fire("udp/raw", &payload); +} + void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) { auto& memberList = *ctx.memberList; diff --git a/test/package.json b/test/package.json index 31fef03..f22bb4b 100644 --- a/test/package.json +++ b/test/package.json @@ -1,5 +1,10 @@ { "dependencies": { "ws": "^8.18.3" + }, + "scripts": { + "pixelstream:fade-green-blue": "node pixelstream/fade-green-blue.js", + "pixelstream:bouncing-ball": "node pixelstream/bouncing-ball.js", + "pixelstream:rainbow": "node pixelstream/rainbow.js" } } diff --git a/test/pixelstream/bouncing-ball.js b/test/pixelstream/bouncing-ball.js new file mode 100644 index 0000000..f726d26 --- /dev/null +++ b/test/pixelstream/bouncing-ball.js @@ -0,0 +1,80 @@ +const dgram = require('dgram'); + +const host = process.argv[2]; +const port = parseInt(process.argv[3] || '4210', 10); +const pixels = parseInt(process.argv[4] || '64', 10); +const intervalMs = parseInt(process.argv[5] || '30', 10); + +if (!host) { + console.error('Usage: node bouncing-ball.js [port] [pixels] [interval-ms]'); + process.exit(1); +} + +const socket = dgram.createSocket('udp4'); +const isBroadcast = host === '255.255.255.255' || host.endsWith('.255'); + +let position = Math.random() * (pixels - 1); +let velocity = randomVelocity(); + +function randomVelocity() { + const min = 0.15; + const max = 0.4; + const sign = Math.random() < 0.5 ? -1 : 1; + return (min + Math.random() * (max - min)) * sign; +} + +function rebound(sign) { + velocity = randomVelocity() * sign; +} + +function mix(a, b, t) { + return a + (b - a) * t; +} + +function generateFrame() { + const dt = intervalMs / 1000; + position += velocity * dt * 60; // scale velocity to 60 FPS reference + + if (position < 0) { + position = -position; + rebound(1); + } else if (position > pixels - 1) { + position = (pixels - 1) - (position - (pixels - 1)); + rebound(-1); + } + + const activeIndex = Math.max(0, Math.min(pixels - 1, Math.round(position))); + + let payload = 'RAW:'; + for (let i = 0; i < pixels; i++) { + if (i === activeIndex) { + payload += 'ff8000'; + continue; + } + + const distance = Math.abs(i - position); + const intensity = Math.max(0, 1 - distance); + const green = Math.round(mix(20, 200, intensity)).toString(16).padStart(2, '0'); + const blue = Math.round(mix(40, 255, intensity)).toString(16).padStart(2, '0'); + payload += '00' + green + blue; + } + + return payload; +} + +function sendFrame() { + const payload = generateFrame(); + const message = Buffer.from(payload, 'utf8'); + socket.send(message, port, host); +} + +setInterval(sendFrame, intervalMs); + +if (isBroadcast) { + socket.bind(() => { + socket.setBroadcast(true); + }); +} + +console.log(`Streaming bouncing ball pattern to ${host}:${port} with ${pixels} pixels (interval=${intervalMs}ms)`); + diff --git a/test/pixelstream/fade-green-blue.js b/test/pixelstream/fade-green-blue.js new file mode 100644 index 0000000..eef3433 --- /dev/null +++ b/test/pixelstream/fade-green-blue.js @@ -0,0 +1,55 @@ +const dgram = require('dgram'); + +const host = process.argv[2]; +const port = parseInt(process.argv[3] || '4210', 10); +const pixels = parseInt(process.argv[4] || '64', 10); +const speed = parseFloat(process.argv[5] || '0.5'); // cycles per second + +if (!host) { + console.error('Usage: node fade-green-blue.js [port] [pixels] [speed-hz]'); + process.exit(1); +} + +const socket = dgram.createSocket('udp4'); +const intervalMs = 50; +let tick = 0; +const isBroadcast = host === '255.255.255.255' || host.endsWith('.255'); + +function generateFrame() { + const timeSeconds = (tick * intervalMs) / 1000; + const phase = timeSeconds * speed * Math.PI * 2; + const blend = (Math.sin(phase) + 1) * 0.5; // 0..1 + + const green = Math.round(255 * (1 - blend)); + const blue = Math.round(255 * blend); + + let payload = 'RAW:'; + const gHex = green.toString(16).padStart(2, '0'); + const bHex = blue.toString(16).padStart(2, '0'); + + for (let i = 0; i < pixels; i++) { + payload += '00'; + payload += gHex; + payload += bHex; + } + + return payload; +} + +function sendFrame() { + const payload = generateFrame(); + const message = Buffer.from(payload, 'utf8'); + socket.send(message, port, host); + tick += 1; +} + +setInterval(sendFrame, intervalMs); + +if (isBroadcast) { + socket.bind(() => { + socket.setBroadcast(true); + }); +} + +console.log(`Streaming green/blue fade to ${host}:${port} with ${pixels} pixels (speed=${speed}Hz)`); + diff --git a/test/pixelstream/rainbow.js b/test/pixelstream/rainbow.js new file mode 100644 index 0000000..f2d42ff --- /dev/null +++ b/test/pixelstream/rainbow.js @@ -0,0 +1,59 @@ +const dgram = require('dgram'); + +const host = process.argv[2]; +const port = parseInt(process.argv[3] || '4210', 10); +const pixels = parseInt(process.argv[4] || '64', 10); +const intervalMs = parseInt(process.argv[5] || '30', 10); + +if (!host) { + console.error('Usage: node rainbow.js [port] [pixels] [interval-ms]'); + process.exit(1); +} + +const socket = dgram.createSocket('udp4'); +let offset = 0; +const isBroadcast = host === '255.255.255.255' || host.endsWith('.255'); + +function wheel(pos) { + pos = 255 - pos; + if (pos < 85) { + return [255 - pos * 3, 0, pos * 3]; + } + if (pos < 170) { + pos -= 85; + return [0, pos * 3, 255 - pos * 3]; + } + pos -= 170; + return [pos * 3, 255 - pos * 3, 0]; +} + +function generateFrame() { + let payload = 'RAW:'; + for (let i = 0; i < pixels; i++) { + const colorIndex = (i * 256 / pixels + offset) & 255; + const [r, g, b] = wheel(colorIndex); + payload += r.toString(16).padStart(2, '0'); + payload += g.toString(16).padStart(2, '0'); + payload += b.toString(16).padStart(2, '0'); + } + + offset = (offset + 1) & 255; + return payload; +} + +function sendFrame() { + const payload = generateFrame(); + const message = Buffer.from(payload, 'utf8'); + socket.send(message, port, host); +} + +setInterval(sendFrame, intervalMs); + +if (isBroadcast) { + socket.bind(() => { + socket.setBroadcast(true); + }); +} + +console.log(`Streaming rainbow pattern to ${host}:${port} with ${pixels} pixels (interval=${intervalMs}ms)`); +