From a195307fc155edbfbf5e7dbe2e9fba08d8c6c1f7 Mon Sep 17 00:00:00 2001 From: Patrick Balsiger Date: Mon, 19 Nov 2018 20:31:34 +0100 Subject: [PATCH] hardcode in/out topics, add chat example --- platformio.ini | 12 +++++++- src/MqttConfig.h | 11 +++---- src/MqttPlugin.cpp | 52 +++++++++++++++++++++----------- src/MqttPlugin.h | 13 ++++---- src/examples/chat/config.h | 38 +++++++++++++++++++++++ src/examples/chat/main.cpp | 62 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 157 insertions(+), 31 deletions(-) create mode 100644 src/examples/chat/config.h create mode 100644 src/examples/chat/main.cpp diff --git a/platformio.ini b/platformio.ini index 85b97a9..9def105 100644 --- a/platformio.ini +++ b/platformio.ini @@ -1,5 +1,5 @@ [platformio] -env_default = basic +env_default = basic, chat [common] framework = arduino @@ -23,5 +23,15 @@ board = ${common.board} upload_speed = ${common.upload_speed} monitor_baud = ${common.monitor_baud} framework = ${common.framework} +lib_deps = ${common.lib_deps} + PubSubClient + +[env:chat] +src_filter = +<*> - + +platform = ${common.platform} +board = ${common.board} +upload_speed = ${common.upload_speed} +monitor_baud = ${common.monitor_baud} +framework = ${common.framework} lib_deps = ${common.lib_deps} PubSubClient \ No newline at end of file diff --git a/src/MqttConfig.h b/src/MqttConfig.h index f079902..37f178a 100644 --- a/src/MqttConfig.h +++ b/src/MqttConfig.h @@ -6,7 +6,7 @@ #define JSON_MQTT_CLIENT_NAME "mqttClientName" #define JSON_MQTT_BROKER_HOST "mqttBrokerHost" #define JSON_MQTT_BROKER_PORT "mqttBrokerPort" -#define JSON_MQTT_IN_TOPIC_ROOT "mqttInTopicRoot" +#define JSON_MQTT_TOPIC_ROOT "mqttTopicRoot" #define JSON_MQTT_OUT_TOPIC_ROOT "mqttOutTopicRoot" struct MqttConfig @@ -14,8 +14,7 @@ struct MqttConfig const char *clientName; const char *brokerHost; int brokerPort; - const char *inTopicRoot; - const char *outTopicRoot; + const char *topicRoot; }; struct MqttConfigJson : public MqttConfig, public JsonStruct @@ -25,8 +24,7 @@ struct MqttConfigJson : public MqttConfig, public JsonStruct root[JSON_MQTT_CLIENT_NAME] = clientName; root[JSON_MQTT_BROKER_HOST] = brokerHost; root[JSON_MQTT_BROKER_PORT] = brokerPort; - root[JSON_MQTT_IN_TOPIC_ROOT] = inTopicRoot; - root[JSON_MQTT_OUT_TOPIC_ROOT] = outTopicRoot; + root[JSON_MQTT_TOPIC_ROOT] = topicRoot; } void fromJsonObject(JsonObject &json) { @@ -39,8 +37,7 @@ struct MqttConfigJson : public MqttConfig, public JsonStruct clientName = getAttr(json, JSON_MQTT_CLIENT_NAME); brokerHost = getAttr(json, JSON_MQTT_BROKER_HOST); brokerPort = getIntAttrFromJson(json, JSON_MQTT_BROKER_PORT); - inTopicRoot = getAttr(json, JSON_MQTT_IN_TOPIC_ROOT); - outTopicRoot = getAttr(json, JSON_MQTT_OUT_TOPIC_ROOT); + topicRoot = getAttr(json, JSON_MQTT_TOPIC_ROOT); valid = 1; }; }; diff --git a/src/MqttPlugin.cpp b/src/MqttPlugin.cpp index 8f91f9a..1aa1a13 100644 --- a/src/MqttPlugin.cpp +++ b/src/MqttPlugin.cpp @@ -1,7 +1,9 @@ #include "MqttPlugin.h" -MqttPlugin::MqttPlugin(MqttConfig cfg) +MqttPlugin::MqttPlugin(MqttConfig cfg, bool bindUp, bool bindDown) { + bindUpstream = bindUp; + bindDownstream = bindDown; applyConfig(cfg); } @@ -9,8 +11,7 @@ void MqttPlugin::applyConfig(MqttConfig cfg) { brokerHost = String(cfg.brokerHost); brokerPort = cfg.brokerPort; clientName = String(cfg.clientName); - inTopicRoot = String(cfg.inTopicRoot); - outTopicRoot = String(cfg.outTopicRoot); + topicRoot = String(cfg.topicRoot); } void MqttPlugin::applyConfigFromFile(const char* fileName) { @@ -57,16 +58,22 @@ void MqttPlugin::connect() { for (auto const &localSub : mediator->subscriptions) { - // bind all local topics to the MQTT upstream once - subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); - // subscribe topic on remote queue to dispatch messages to local subscriptions - client->subscribe( - (inTopicRoot + String(localSub.first.c_str())) - .c_str()); + if(bindUpstream){ + // bind all local topics to the MQTT upstream once + subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); + } + if(bindDownstream){ + // subscribe topic on remote queue to dispatch messages to local subscriptions + client->subscribe( + (topicRoot + String("/in/") + String(localSub.first.c_str())).c_str()); + } } subscribed = true; } + publish("mqtt/connect", clientName); PRINT_MSG(Serial, "MQTT", "connected"); + } else { + PRINT_MSG(Serial, "MQTT", "connect failed"); } } } @@ -74,10 +81,9 @@ void MqttPlugin::connect() void MqttPlugin::upstreamHandler(String topic, String msg) { // publish message on remote queue - String remoteTopic = outTopicRoot + topic; - Serial.println(remoteTopic); + String remoteTopic = topicRoot + String("/out/") + topic; client->publish(remoteTopic.c_str(), msg.c_str()); - PRINT_MSG(Serial, "MQTT", remoteTopic.c_str()); + PRINT_MSG(Serial, "MQTT", String("pub: "+ remoteTopic).c_str()); } void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int length) @@ -88,10 +94,22 @@ void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int l String msg = String(cleanPayload); free(cleanPayload); - // substract the topic root and publish msg on local topic - int topicRootLength = inTopicRoot.length(); - String localTopic = String(topic).substring(topicRootLength); - publish(localTopic, msg); + // substract the topic root + "/in/" and publish msg on local topic + String topicStr = String(topic); + int topicRootLength = topicRoot.length(); + if(topicStr.length() > topicRootLength){ + String localTopic = topicStr.substring(topicRootLength); + if(localTopic.length() > 4){ + String direction = localTopic.substring(0,4); + if(direction == "/in/" ){ + String localSubTopic = localTopic.substring(direction.length()); + publish(localSubTopic, msg); + PRINT_MSG(Serial, "MQTT", (String("publish /in/: ") + localSubTopic).c_str()); + } else { + publish(localTopic, msg); + PRINT_MSG(Serial, "MQTT", (String("publish mediator: ") + localTopic).c_str()); + } + } + } - PRINT_MSG(Serial, "MQTT", (String("publish local: ") + localTopic).c_str()); } diff --git a/src/MqttPlugin.h b/src/MqttPlugin.h index b5d2ff5..37373d2 100644 --- a/src/MqttPlugin.h +++ b/src/MqttPlugin.h @@ -18,7 +18,7 @@ class MqttPlugin : public Plugin public: PubSubClient *client; - MqttPlugin(MqttConfig cfg); + MqttPlugin(MqttConfig cfg, bool bindUp = true, bool bindDown = true); /** * Connects to queue and triggers connect and process task. @@ -31,11 +31,12 @@ class MqttPlugin : public Plugin Task connectTask; Task processTask; bool subscribed = false; + bool bindUpstream = false; + bool bindDownstream = false; String brokerHost; int brokerPort; String clientName; - String inTopicRoot; - String outTopicRoot; + String topicRoot; void applyConfig(MqttConfig cfg); void applyConfigFromFile(const char* fileName); @@ -46,20 +47,20 @@ class MqttPlugin : public Plugin * to the corresponding topic on the remote queue by prefixing the topic with inRootTopic. * Creates shadow subscriptions of every local topic to propagate messages to the remote queue via upstreamHandler. */ - void connect(); + virtual void connect(); /** * The upstream handler is bound to every local subscription. * It passes the message to the remote queue by prefixing the outRootTopic. */ - void upstreamHandler(String topic, String msg); + virtual void upstreamHandler(String topic, String msg); /** * The downstream handler is bound to the remote counterpart of local subscriptions, * prefixed with inRootTopic. * Everything after the prefix is used as local topic and dispatched to local subscriptions. */ - void downstreamHandler(char *topic, uint8_t *payload, unsigned int length); + virtual void downstreamHandler(char *topic, uint8_t *payload, unsigned int length); }; #endif \ No newline at end of file diff --git a/src/examples/chat/config.h b/src/examples/chat/config.h new file mode 100644 index 0000000..b78a35b --- /dev/null +++ b/src/examples/chat/config.h @@ -0,0 +1,38 @@ +#ifndef __DEVICE_CONFIG__ +#define __DEVICE_CONFIG__ + +// Scheduler config +#define _TASK_SLEEP_ON_IDLE_RUN +#define _TASK_STD_FUNCTION +#define _TASK_PRIORITY + +// Chip config +#define SPROCKET_TYPE "SPROCKET" +#define SERIAL_BAUD_RATE 115200 +#define STARTUP_DELAY 1000 + +// network config +#define SPROCKET_MODE 1 +#define WIFI_CHANNEL 11 +#define MESH_PORT 5555 +#define AP_SSID "sprocket" +#define AP_PASSWORD "th3r31sn0sp00n" +#define MESH_PREFIX "sprocket-mesh" +#define MESH_PASSWORD "th3r31sn0sp00n" +#define STATION_SSID "MyAP" +#define STATION_PASSWORD "th3r31sn0sp00n" +#define HOSTNAME "sprocket" +#define CONNECT_TIMEOUT 10000 +#define MESH_DEBUG_TYPES ERROR | STARTUP | CONNECTION +//#define MESH_DEBUG_TYPES ERROR | MESH_STATUS | CONNECTION | SYNC | COMMUNICATION | GENERAL | MSG_TYPES | REMOTE + +// WebServer +#define WEB_CONTEXT_PATH "/" +#define WEB_DOC_ROOT "/www" +#define WEB_DEFAULT_FILE "index.html" +#define WEB_PORT 80 + +#define MQTT_CONFIG_FILE "/mqttConfig.json" + + +#endif \ No newline at end of file diff --git a/src/examples/chat/main.cpp b/src/examples/chat/main.cpp new file mode 100644 index 0000000..3bd4d1d --- /dev/null +++ b/src/examples/chat/main.cpp @@ -0,0 +1,62 @@ +#include "config.h" +#include "WiFiNet.h" +#include "Sprocket.h" +#include "MqttPlugin.h" + +WiFiNet *network; +Sprocket *sprocket; +MqttPlugin *mqttPlugin; + +Task publishHeap; + +void setup() +{ + + sprocket = new Sprocket({STARTUP_DELAY, SERIAL_BAUD_RATE}); + mqttPlugin = new MqttPlugin({"chatSprocket", + "192.168.1.2", + 1883, + "wirelos/sprocket"}, + true, true); + sprocket->addPlugin(mqttPlugin); + + // this subscription gets shadowed by the MqttPlugin + // to rout messages to the queue on topic wirelos/sprocket/chat/log + sprocket->subscribe("chat/log", [](String msg) { + // gets routed to MQTT broker + PRINT_MSG(Serial, "CHAT", msg.c_str()); + }); + publishHeap.set(TASK_SECOND * 5, TASK_FOREVER, []() { + // locally publish a message + sprocket->publish("chat/log", "hi there"); + }); + sprocket->addTask(publishHeap); + + network = new WiFiNet( + SPROCKET_MODE, + STATION_SSID, + STATION_PASSWORD, + AP_SSID, + AP_PASSWORD, + HOSTNAME, + CONNECT_TIMEOUT); + network->connect(); + + sprocket->activate(); + + sprocket->subscribe("mqtt/connect", [](String msg) { + if (msg.length() > 0) + { + mqttPlugin->client->subscribe("wirelos/sprocket/out/chat/log"); + sprocket->subscribe("/out/chat/log", [](String msg){ + PRINT_MSG(Serial, "CHAT", String("incoming: " +msg).c_str()); + }); + } + }); +} + +void loop() +{ + sprocket->loop(); + yield(); +} \ No newline at end of file