From 99d72b06a072107acdfe05285712c73fd5cb88b4 Mon Sep 17 00:00:00 2001 From: Patrick Balsiger Date: Thu, 15 Nov 2018 12:50:59 +0100 Subject: [PATCH] header --- src/MqttPlugin.cpp | 167 ++++++++++++++---------------------- src/MqttPlugin.h | 53 ++++++++++++ src/examples/basic/main.cpp | 2 +- 3 files changed, 119 insertions(+), 103 deletions(-) create mode 100644 src/MqttPlugin.h diff --git a/src/MqttPlugin.cpp b/src/MqttPlugin.cpp index e81581f..a944c4e 100644 --- a/src/MqttPlugin.cpp +++ b/src/MqttPlugin.cpp @@ -1,121 +1,84 @@ -#ifndef __MQTT_PLUGIN__ -#define __MQTT_PLUGIN__ +#include "MqttPlugin.h" -#define _TASK_SLEEP_ON_IDLE_RUN -#define _TASK_STD_FUNCTION - -#include -#include -#include -#include "utils_print.h" - -using namespace std; -using namespace std::placeholders; - -struct MqttConfig +MqttPlugin::MqttPlugin(MqttConfig cfg) { - const char *clientName; - const char *brokerHost; - int brokerPort; - const char *inTopicRoot; - const char *outTopicRoot; -}; + mqttConfig = cfg; +} -class MqttPlugin : public Plugin +void MqttPlugin::activate(Scheduler *scheduler) { - public: - PubSubClient *client; - WiFiClient wifiClient; - Task connectTask; - Task processTask; - MqttConfig mqttConfig; - vector topics; + client = new PubSubClient(mqttConfig.brokerHost, mqttConfig.brokerPort, bind(&MqttPlugin::downstreamHandler, this, _1, _2, _3), wifiClient); + enableConnectTask(scheduler); + enableProcessTask(scheduler); + PRINT_MSG(Serial, "MQTT", "plugin activated"); +} - MqttPlugin(MqttConfig cfg) - { - mqttConfig = cfg; - //subscribe("mqtt/out", bind(&MqttPlugin::mqttPublish, this, _1)); - } +void MqttPlugin::enableConnectTask(Scheduler *scheduler) +{ + connectTask.set(TASK_SECOND * 5, TASK_FOREVER, bind(&MqttPlugin::connect, this)); + scheduler->addTask(connectTask); + connectTask.enable(); +} - void activate(Scheduler *scheduler) - { - client = new PubSubClient(mqttConfig.brokerHost, mqttConfig.brokerPort, bind(&MqttPlugin::downstreamHandler, this, _1, _2, _3), wifiClient); - enableConnectTask(scheduler); - enableProcessTask(scheduler); - PRINT_MSG(Serial, "MQTT", "plugin activated"); - } +void MqttPlugin::enableProcessTask(Scheduler *scheduler) +{ + processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&MqttPlugin::process, this)); + scheduler->addTask(processTask); + processTask.enable(); +} - private: - void enableConnectTask(Scheduler *scheduler) - { - connectTask.set(TASK_SECOND * 5, TASK_FOREVER, bind(&MqttPlugin::connect, this)); - scheduler->addTask(connectTask); - connectTask.enable(); - } +void MqttPlugin::process() +{ + client->loop(); +} - void enableProcessTask(Scheduler *scheduler) +void MqttPlugin::connect() +{ + if (!client->connected()) { - processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&MqttPlugin::process, this)); - scheduler->addTask(processTask); - processTask.enable(); - } - - void process() - { - client->loop(); - } - - void connect() - { - if (!client->connected()) + PRINT_MSG(Serial, "MQTT", "try connect"); + if (client->connect(mqttConfig.clientName)) { - PRINT_MSG(Serial, "MQTT", "try connect"); - if (client->connect(mqttConfig.clientName)) + // bind handlers to all local subscriptions + for (auto const &localSub : mediator->subscriptions) { - // bind handlers to all local subscriptions - for (auto const &localSub : mediator->subscriptions) - { - // bind all local topics to the MQTT upstream - subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); - // subscribe topic on remote queue to dispatch messages to local subscriptions - client->subscribe( - (String(mqttConfig.inTopicRoot) + String(localSub.first.c_str())) - .c_str()); - } - PRINT_MSG(Serial, "MQTT", "connected"); + // bind all local topics to the MQTT upstream + subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); + // subscribe topic on remote queue to dispatch messages to local subscriptions + client->subscribe( + (String(mqttConfig.inTopicRoot) + String(localSub.first.c_str())) + .c_str()); } + PRINT_MSG(Serial, "MQTT", "connected"); } } +} - void mqttPublish(String msg) - { - client->publish(mqttConfig.outTopicRoot, msg.c_str()); - } +void MqttPlugin::mqttPublish(String msg) +{ + client->publish(mqttConfig.outTopicRoot, msg.c_str()); +} - void upstreamHandler(String topic, String msg) - { - // publish message on remote queue - PRINT_MSG(Serial, "MQTT", String("upstream: " + msg).c_str()); - client->publish((String(mqttConfig.outTopicRoot) + topic).c_str(), msg.c_str()); - } +void MqttPlugin::upstreamHandler(String topic, String msg) +{ + // publish message on remote queue + PRINT_MSG(Serial, "MQTT", String("upstream: " + msg).c_str()); + client->publish((String(mqttConfig.outTopicRoot) + topic).c_str(), msg.c_str()); +} - void downstreamHandler(char *topic, uint8_t *payload, unsigned int length) - { - char *cleanPayload = (char *)malloc(length + 1); - payload[length] = '\0'; - memcpy(cleanPayload, payload, length + 1); - String msg = String(cleanPayload); - free(cleanPayload); +void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int length) +{ + char *cleanPayload = (char *)malloc(length + 1); + payload[length] = '\0'; + memcpy(cleanPayload, payload, length + 1); + String msg = String(cleanPayload); + free(cleanPayload); - // substract the topic root and publish msg on local topic - int topicRootLength = String(mqttConfig.inTopicRoot).length(); - String localTopic = String(topic).substring(topicRootLength); - publish(localTopic, msg); + // substract the topic root and publish msg on local topic + int topicRootLength = String(mqttConfig.inTopicRoot).length(); + String localTopic = String(topic).substring(topicRootLength); + publish(localTopic, msg); - PRINT_MSG(Serial, "MQTT", (String(topic) + " " + msg).c_str()); - PRINT_MSG(Serial, "MQTT", (String("publish local: ") + localTopic).c_str()); - - } -}; - -#endif \ No newline at end of file + PRINT_MSG(Serial, "MQTT", (String(topic) + " " + msg).c_str()); + PRINT_MSG(Serial, "MQTT", (String("publish local: ") + localTopic).c_str()); +} diff --git a/src/MqttPlugin.h b/src/MqttPlugin.h new file mode 100644 index 0000000..2e1422b --- /dev/null +++ b/src/MqttPlugin.h @@ -0,0 +1,53 @@ +#ifndef __MQTT_PLUGIN__ +#define __MQTT_PLUGIN__ + +#define _TASK_SLEEP_ON_IDLE_RUN +#define _TASK_STD_FUNCTION + +#include +#include +#include +#include "utils_print.h" + +using namespace std; +using namespace std::placeholders; + +struct MqttConfig +{ + const char *clientName; + const char *brokerHost; + int brokerPort; + const char *inTopicRoot; + const char *outTopicRoot; +}; + +class MqttPlugin : public Plugin +{ + public: + PubSubClient *client; + WiFiClient wifiClient; + Task connectTask; + Task processTask; + MqttConfig mqttConfig; + vector topics; + + MqttPlugin(MqttConfig cfg); + void activate(Scheduler *scheduler); + + private: + void enableConnectTask(Scheduler *scheduler); + + void enableProcessTask(Scheduler *scheduler); + + void process(); + + void connect(); + + void mqttPublish(String msg); + + void upstreamHandler(String topic, String msg); + + void downstreamHandler(char *topic, uint8_t *payload, unsigned int length); +}; + +#endif \ No newline at end of file diff --git a/src/examples/basic/main.cpp b/src/examples/basic/main.cpp index 6c28d74..13609f4 100644 --- a/src/examples/basic/main.cpp +++ b/src/examples/basic/main.cpp @@ -1,7 +1,7 @@ #include "config.h" #include "WiFiNet.h" #include "Sprocket.h" -#include "MqttPlugin.cpp" +#include "MqttPlugin.h" WiFiNet *network; Sprocket *sprocket;