diff --git a/src/MqttPlugin.cpp b/src/MqttPlugin.cpp index a944c4e..fdbc0e2 100644 --- a/src/MqttPlugin.cpp +++ b/src/MqttPlugin.cpp @@ -22,16 +22,11 @@ void MqttPlugin::enableConnectTask(Scheduler *scheduler) void MqttPlugin::enableProcessTask(Scheduler *scheduler) { - processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&MqttPlugin::process, this)); + processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&PubSubClient::loop, client)); scheduler->addTask(processTask); processTask.enable(); } -void MqttPlugin::process() -{ - client->loop(); -} - void MqttPlugin::connect() { if (!client->connected()) @@ -42,8 +37,11 @@ void MqttPlugin::connect() // bind handlers to all local subscriptions for (auto const &localSub : mediator->subscriptions) { - // bind all local topics to the MQTT upstream - subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); + // bind all local topics to the MQTT upstream once + if (!locallySubscribed) + { + subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); + } // subscribe topic on remote queue to dispatch messages to local subscriptions client->subscribe( (String(mqttConfig.inTopicRoot) + String(localSub.first.c_str())) @@ -54,11 +52,6 @@ void MqttPlugin::connect() } } -void MqttPlugin::mqttPublish(String msg) -{ - client->publish(mqttConfig.outTopicRoot, msg.c_str()); -} - void MqttPlugin::upstreamHandler(String topic, String msg) { // publish message on remote queue diff --git a/src/MqttPlugin.h b/src/MqttPlugin.h index 2e1422b..59d43cb 100644 --- a/src/MqttPlugin.h +++ b/src/MqttPlugin.h @@ -25,28 +25,43 @@ class MqttPlugin : public Plugin { public: PubSubClient *client; - WiFiClient wifiClient; - Task connectTask; - Task processTask; MqttConfig mqttConfig; - vector topics; MqttPlugin(MqttConfig cfg); + + /** + * Connects to queue and triggers connect and process task. + * Binds downstream handler to MQTT client. + */ void activate(Scheduler *scheduler); private: - void enableConnectTask(Scheduler *scheduler); + WiFiClient wifiClient; + Task connectTask; + Task processTask; + bool locallySubscribed = false; + void enableConnectTask(Scheduler *scheduler); void enableProcessTask(Scheduler *scheduler); - void process(); - + /** + * Connects to MQTT server and subscribes all topics available in the mediator + * to the corresponding topic on the remote queue by prefixing the topic with inRootTopic. + * Creates shadow subscriptions of every local topic to propagate messages to the remote queue via upstreamHandler. + */ void connect(); - void mqttPublish(String msg); - + /** + * The upstream handler is bound to every local subscription. + * It passes the message to the remote queue by prefixing the outRootTopic. + */ void upstreamHandler(String topic, String msg); + /** + * The downstream handler is bound to the remote counterpart of local subscriptions, + * prefixed with inRootTopic. + * Everything after the prefix is used as local topic and dispatched to local subscriptions. + */ void downstreamHandler(char *topic, uint8_t *payload, unsigned int length); }; diff --git a/src/examples/basic/main.cpp b/src/examples/basic/main.cpp index 13609f4..50f0088 100644 --- a/src/examples/basic/main.cpp +++ b/src/examples/basic/main.cpp @@ -17,7 +17,7 @@ void setup() sprocket = new Sprocket( {STARTUP_DELAY, SERIAL_BAUD_RATE}); sprocket->subscribe("local/topic", [](String msg){ - Serial.println("Local: " + msg); + PRINT_MSG(Serial, "LOCAL", msg.c_str()); }); sprocket->addTask(publishHeap); sprocket->addPlugin(