From 4c27a0fe7becfb504d19fa06b8793e297a709cc3 Mon Sep 17 00:00:00 2001 From: Patrick Balsiger Date: Thu, 15 Nov 2018 12:32:02 +0100 Subject: [PATCH] use print fns, example publish task --- src/MqttPlugin.cpp | 55 +++++++++---------------------------- src/examples/basic/main.cpp | 9 +++++- 2 files changed, 21 insertions(+), 43 deletions(-) diff --git a/src/MqttPlugin.cpp b/src/MqttPlugin.cpp index 09fc177..e81581f 100644 --- a/src/MqttPlugin.cpp +++ b/src/MqttPlugin.cpp @@ -7,6 +7,7 @@ #include #include #include +#include "utils_print.h" using namespace std; using namespace std::placeholders; @@ -41,7 +42,7 @@ class MqttPlugin : public Plugin client = new PubSubClient(mqttConfig.brokerHost, mqttConfig.brokerPort, bind(&MqttPlugin::downstreamHandler, this, _1, _2, _3), wifiClient); enableConnectTask(scheduler); enableProcessTask(scheduler); - Serial.println("MQTT plugin activated"); // FIXME use PRINT_MSG + PRINT_MSG(Serial, "MQTT", "plugin activated"); } private: @@ -68,25 +69,22 @@ class MqttPlugin : public Plugin { if (!client->connected()) { - Serial.println("MQTT try connect"); + PRINT_MSG(Serial, "MQTT", "try connect"); if (client->connect(mqttConfig.clientName)) { - Serial.println("MQTT connected"); - upstreamHandler("lobby", "join"); - // overkill to subscribe to all...? + // bind handlers to all local subscriptions for (auto const &localSub : mediator->subscriptions) { + // bind all local topics to the MQTT upstream subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); + // subscribe topic on remote queue to dispatch messages to local subscriptions client->subscribe( (String(mqttConfig.inTopicRoot) + String(localSub.first.c_str())) .c_str()); } + PRINT_MSG(Serial, "MQTT", "connected"); } } - else - { - publish("local/topic", "Free heap: " + String(ESP.getFreeHeap())); - } } void mqttPublish(String msg) @@ -96,9 +94,8 @@ class MqttPlugin : public Plugin void upstreamHandler(String topic, String msg) { - Serial.println(msg); // publish message on remote queue - // TODO rootTopic + topic? + PRINT_MSG(Serial, "MQTT", String("upstream: " + msg).c_str()); client->publish((String(mqttConfig.outTopicRoot) + topic).c_str(), msg.c_str()); } @@ -110,41 +107,15 @@ class MqttPlugin : public Plugin String msg = String(cleanPayload); free(cleanPayload); - Serial.println("MQTT topic " + String(topic)); - Serial.println("MQTT msg " + msg); - + // substract the topic root and publish msg on local topic int topicRootLength = String(mqttConfig.inTopicRoot).length(); String localTopic = String(topic).substring(topicRootLength); - Serial.println("Local topic:" + localTopic); - - // publish message to local subscribers on device publish(localTopic, msg); + + PRINT_MSG(Serial, "MQTT", (String(topic) + " " + msg).c_str()); + PRINT_MSG(Serial, "MQTT", (String("publish local: ") + localTopic).c_str()); + } - /* void mqttCallback(char* topic, uint8_t* payload, unsigned int length) { - char* cleanPayload = (char*)malloc(length+1); - payload[length] = '\0'; - memcpy(cleanPayload, payload, length+1); - String msg = String(cleanPayload); - free(cleanPayload); - - int topicRootLength = String(mqttConfig.topicRoot).length(); - String targetStr = String(topic).substring(topicRootLength + 4); - - if(targetStr == "gateway"){ - if(msg == "getNodes") { - client->publish(MQTT_TOPIC_FROM_GATEWAY, net->mesh.subConnectionJson().c_str()); - } - } else if(targetStr == "broadcast") { - net->mesh.sendBroadcast(msg); - } else { - uint32_t target = strtoul(targetStr.c_str(), NULL, 10); - if(net->mesh.isConnected(target)){ - net->mesh.sendSingle(target, msg); - } else { - client->publish(MQTT_TOPIC_FROM_GATEWAY, "Client not connected!"); - } - } - } */ }; #endif \ No newline at end of file diff --git a/src/examples/basic/main.cpp b/src/examples/basic/main.cpp index e71c210..6c28d74 100644 --- a/src/examples/basic/main.cpp +++ b/src/examples/basic/main.cpp @@ -6,13 +6,20 @@ WiFiNet *network; Sprocket *sprocket; +Task publishHeap; + void setup() { + publishHeap.set(TASK_SECOND * 5, TASK_FOREVER, [](){ + sprocket->publish("local/topic", "heap=" + String(ESP.getFreeHeap())); + }); + sprocket = new Sprocket( {STARTUP_DELAY, SERIAL_BAUD_RATE}); sprocket->subscribe("local/topic", [](String msg){ - Serial.println("Received: " + msg); + Serial.println("Local: " + msg); }); + sprocket->addTask(publishHeap); sprocket->addPlugin( new MqttPlugin({"sprocket", "192.168.1.2", 1883, "wirelos/mqttSprocket-in/", "wirelos/mqttSprocket-out/"}));