feat: udp stream

This commit is contained in:
2025-10-01 21:47:27 +02:00
parent 99e7ed4809
commit f3d99b174f
13 changed files with 517 additions and 0 deletions

View File

@@ -152,6 +152,11 @@ void ApiServer::setupWebSocket() {
// Subscribe to all local events and forward to websocket clients
ctx.onAny([this](const std::string& event, void* dataPtr) {
// Ignore raw UDP frames
if (event == "udp/raw") {
return;
}
String* payloadStrPtr = static_cast<String*>(dataPtr);
String payloadStr = payloadStrPtr ? *payloadStrPtr : String("");

View File

@@ -68,6 +68,7 @@ void ClusterManager::listen() {
void ClusterManager::initMessageHandlers() {
messageHandlers.clear();
messageHandlers.push_back({ &ClusterManager::isRawMsg, [this](const char* msg){ this->onRawMessage(msg); }, "RAW" });
messageHandlers.push_back({ &ClusterManager::isDiscoveryMsg, [this](const char* msg){ this->onDiscovery(msg); }, "DISCOVERY" });
messageHandlers.push_back({ &ClusterManager::isHeartbeatMsg, [this](const char* msg){ this->onHeartbeat(msg); }, "HEARTBEAT" });
messageHandlers.push_back({ &ClusterManager::isResponseMsg, [this](const char* msg){ this->onResponse(msg); }, "RESPONSE" });
@@ -113,6 +114,15 @@ bool ClusterManager::isClusterEventMsg(const char* msg) {
return strncmp(msg, ClusterProtocol::CLUSTER_EVENT_MSG, strlen(ClusterProtocol::CLUSTER_EVENT_MSG)) == 0;
}
bool ClusterManager::isRawMsg(const char* msg) {
// RAW frames must be "RAW:<payload>"; enforce the delimiter so we skip things like "RAW_HEARTBEAT".
const std::size_t prefixLen = strlen(ClusterProtocol::RAW_MSG);
if (strncmp(msg, ClusterProtocol::RAW_MSG, prefixLen) != 0) {
return false;
}
return msg[prefixLen] == ':';
}
void ClusterManager::onDiscovery(const char* /*msg*/) {
ctx.udp->beginPacket(ctx.udp->remoteIP(), ctx.config.udp_port);
String response = String(ClusterProtocol::RESPONSE_MSG) + ":" + ctx.hostname;
@@ -240,6 +250,23 @@ void ClusterManager::onClusterEvent(const char* msg) {
ctx.fire(eventKey, &data);
}
void ClusterManager::onRawMessage(const char* msg) {
const std::size_t prefixLen = strlen(ClusterProtocol::RAW_MSG);
if (msg[prefixLen] != ':') {
LOG_WARN("Cluster", "RAW message received without payload delimiter");
return;
}
const char* payloadStart = msg + prefixLen + 1;
if (*payloadStart == '\0') {
LOG_WARN("Cluster", "RAW message received with empty payload");
return;
}
String payload(payloadStart);
ctx.fire("udp/raw", &payload);
}
void ClusterManager::addOrUpdateNode(const String& nodeHost, IPAddress nodeIP) {
auto& memberList = *ctx.memberList;