documentation

This commit is contained in:
2018-11-15 13:12:02 +01:00
parent 99d72b06a0
commit 4f83ad4e0b
3 changed files with 31 additions and 23 deletions

View File

@@ -22,16 +22,11 @@ void MqttPlugin::enableConnectTask(Scheduler *scheduler)
void MqttPlugin::enableProcessTask(Scheduler *scheduler) void MqttPlugin::enableProcessTask(Scheduler *scheduler)
{ {
processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&MqttPlugin::process, this)); processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&PubSubClient::loop, client));
scheduler->addTask(processTask); scheduler->addTask(processTask);
processTask.enable(); processTask.enable();
} }
void MqttPlugin::process()
{
client->loop();
}
void MqttPlugin::connect() void MqttPlugin::connect()
{ {
if (!client->connected()) if (!client->connected())
@@ -42,8 +37,11 @@ void MqttPlugin::connect()
// bind handlers to all local subscriptions // bind handlers to all local subscriptions
for (auto const &localSub : mediator->subscriptions) for (auto const &localSub : mediator->subscriptions)
{ {
// bind all local topics to the MQTT upstream // bind all local topics to the MQTT upstream once
subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); if (!locallySubscribed)
{
subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1));
}
// subscribe topic on remote queue to dispatch messages to local subscriptions // subscribe topic on remote queue to dispatch messages to local subscriptions
client->subscribe( client->subscribe(
(String(mqttConfig.inTopicRoot) + String(localSub.first.c_str())) (String(mqttConfig.inTopicRoot) + String(localSub.first.c_str()))
@@ -54,11 +52,6 @@ void MqttPlugin::connect()
} }
} }
void MqttPlugin::mqttPublish(String msg)
{
client->publish(mqttConfig.outTopicRoot, msg.c_str());
}
void MqttPlugin::upstreamHandler(String topic, String msg) void MqttPlugin::upstreamHandler(String topic, String msg)
{ {
// publish message on remote queue // publish message on remote queue

View File

@@ -25,28 +25,43 @@ class MqttPlugin : public Plugin
{ {
public: public:
PubSubClient *client; PubSubClient *client;
WiFiClient wifiClient;
Task connectTask;
Task processTask;
MqttConfig mqttConfig; MqttConfig mqttConfig;
vector<String> topics;
MqttPlugin(MqttConfig cfg); MqttPlugin(MqttConfig cfg);
/**
* Connects to queue and triggers connect and process task.
* Binds downstream handler to MQTT client.
*/
void activate(Scheduler *scheduler); void activate(Scheduler *scheduler);
private: private:
void enableConnectTask(Scheduler *scheduler); WiFiClient wifiClient;
Task connectTask;
Task processTask;
bool locallySubscribed = false;
void enableConnectTask(Scheduler *scheduler);
void enableProcessTask(Scheduler *scheduler); void enableProcessTask(Scheduler *scheduler);
void process(); /**
* Connects to MQTT server and subscribes all topics available in the mediator
* to the corresponding topic on the remote queue by prefixing the topic with inRootTopic.
* Creates shadow subscriptions of every local topic to propagate messages to the remote queue via upstreamHandler.
*/
void connect(); void connect();
void mqttPublish(String msg); /**
* The upstream handler is bound to every local subscription.
* It passes the message to the remote queue by prefixing the outRootTopic.
*/
void upstreamHandler(String topic, String msg); void upstreamHandler(String topic, String msg);
/**
* The downstream handler is bound to the remote counterpart of local subscriptions,
* prefixed with inRootTopic.
* Everything after the prefix is used as local topic and dispatched to local subscriptions.
*/
void downstreamHandler(char *topic, uint8_t *payload, unsigned int length); void downstreamHandler(char *topic, uint8_t *payload, unsigned int length);
}; };

View File

@@ -17,7 +17,7 @@ void setup()
sprocket = new Sprocket( sprocket = new Sprocket(
{STARTUP_DELAY, SERIAL_BAUD_RATE}); {STARTUP_DELAY, SERIAL_BAUD_RATE});
sprocket->subscribe("local/topic", [](String msg){ sprocket->subscribe("local/topic", [](String msg){
Serial.println("Local: " + msg); PRINT_MSG(Serial, "LOCAL", msg.c_str());
}); });
sprocket->addTask(publishHeap); sprocket->addTask(publishHeap);
sprocket->addPlugin( sprocket->addPlugin(