This commit is contained in:
2018-11-15 12:50:59 +01:00
parent 4c27a0fe7b
commit 99d72b06a0
3 changed files with 119 additions and 103 deletions

View File

@@ -1,121 +1,84 @@
#ifndef __MQTT_PLUGIN__ #include "MqttPlugin.h"
#define __MQTT_PLUGIN__
#define _TASK_SLEEP_ON_IDLE_RUN MqttPlugin::MqttPlugin(MqttConfig cfg)
#define _TASK_STD_FUNCTION
#include <WiFiClient.h>
#include <PubSubClient.h>
#include <Plugin.h>
#include "utils_print.h"
using namespace std;
using namespace std::placeholders;
struct MqttConfig
{ {
const char *clientName; mqttConfig = cfg;
const char *brokerHost; }
int brokerPort;
const char *inTopicRoot;
const char *outTopicRoot;
};
class MqttPlugin : public Plugin void MqttPlugin::activate(Scheduler *scheduler)
{ {
public: client = new PubSubClient(mqttConfig.brokerHost, mqttConfig.brokerPort, bind(&MqttPlugin::downstreamHandler, this, _1, _2, _3), wifiClient);
PubSubClient *client; enableConnectTask(scheduler);
WiFiClient wifiClient; enableProcessTask(scheduler);
Task connectTask; PRINT_MSG(Serial, "MQTT", "plugin activated");
Task processTask; }
MqttConfig mqttConfig;
vector<String> topics;
MqttPlugin(MqttConfig cfg) void MqttPlugin::enableConnectTask(Scheduler *scheduler)
{ {
mqttConfig = cfg; connectTask.set(TASK_SECOND * 5, TASK_FOREVER, bind(&MqttPlugin::connect, this));
//subscribe("mqtt/out", bind(&MqttPlugin::mqttPublish, this, _1)); scheduler->addTask(connectTask);
} connectTask.enable();
}
void activate(Scheduler *scheduler) void MqttPlugin::enableProcessTask(Scheduler *scheduler)
{ {
client = new PubSubClient(mqttConfig.brokerHost, mqttConfig.brokerPort, bind(&MqttPlugin::downstreamHandler, this, _1, _2, _3), wifiClient); processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&MqttPlugin::process, this));
enableConnectTask(scheduler); scheduler->addTask(processTask);
enableProcessTask(scheduler); processTask.enable();
PRINT_MSG(Serial, "MQTT", "plugin activated"); }
}
private: void MqttPlugin::process()
void enableConnectTask(Scheduler *scheduler) {
{ client->loop();
connectTask.set(TASK_SECOND * 5, TASK_FOREVER, bind(&MqttPlugin::connect, this)); }
scheduler->addTask(connectTask);
connectTask.enable();
}
void enableProcessTask(Scheduler *scheduler) void MqttPlugin::connect()
{
if (!client->connected())
{ {
processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&MqttPlugin::process, this)); PRINT_MSG(Serial, "MQTT", "try connect");
scheduler->addTask(processTask); if (client->connect(mqttConfig.clientName))
processTask.enable();
}
void process()
{
client->loop();
}
void connect()
{
if (!client->connected())
{ {
PRINT_MSG(Serial, "MQTT", "try connect"); // bind handlers to all local subscriptions
if (client->connect(mqttConfig.clientName)) for (auto const &localSub : mediator->subscriptions)
{ {
// bind handlers to all local subscriptions // bind all local topics to the MQTT upstream
for (auto const &localSub : mediator->subscriptions) subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1));
{ // subscribe topic on remote queue to dispatch messages to local subscriptions
// bind all local topics to the MQTT upstream client->subscribe(
subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); (String(mqttConfig.inTopicRoot) + String(localSub.first.c_str()))
// subscribe topic on remote queue to dispatch messages to local subscriptions .c_str());
client->subscribe(
(String(mqttConfig.inTopicRoot) + String(localSub.first.c_str()))
.c_str());
}
PRINT_MSG(Serial, "MQTT", "connected");
} }
PRINT_MSG(Serial, "MQTT", "connected");
} }
} }
}
void mqttPublish(String msg) void MqttPlugin::mqttPublish(String msg)
{ {
client->publish(mqttConfig.outTopicRoot, msg.c_str()); client->publish(mqttConfig.outTopicRoot, msg.c_str());
} }
void upstreamHandler(String topic, String msg) void MqttPlugin::upstreamHandler(String topic, String msg)
{ {
// publish message on remote queue // publish message on remote queue
PRINT_MSG(Serial, "MQTT", String("upstream: " + msg).c_str()); PRINT_MSG(Serial, "MQTT", String("upstream: " + msg).c_str());
client->publish((String(mqttConfig.outTopicRoot) + topic).c_str(), msg.c_str()); client->publish((String(mqttConfig.outTopicRoot) + topic).c_str(), msg.c_str());
} }
void downstreamHandler(char *topic, uint8_t *payload, unsigned int length) void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int length)
{ {
char *cleanPayload = (char *)malloc(length + 1); char *cleanPayload = (char *)malloc(length + 1);
payload[length] = '\0'; payload[length] = '\0';
memcpy(cleanPayload, payload, length + 1); memcpy(cleanPayload, payload, length + 1);
String msg = String(cleanPayload); String msg = String(cleanPayload);
free(cleanPayload); free(cleanPayload);
// substract the topic root and publish msg on local topic // substract the topic root and publish msg on local topic
int topicRootLength = String(mqttConfig.inTopicRoot).length(); int topicRootLength = String(mqttConfig.inTopicRoot).length();
String localTopic = String(topic).substring(topicRootLength); String localTopic = String(topic).substring(topicRootLength);
publish(localTopic, msg); publish(localTopic, msg);
PRINT_MSG(Serial, "MQTT", (String(topic) + " " + msg).c_str()); PRINT_MSG(Serial, "MQTT", (String(topic) + " " + msg).c_str());
PRINT_MSG(Serial, "MQTT", (String("publish local: ") + localTopic).c_str()); PRINT_MSG(Serial, "MQTT", (String("publish local: ") + localTopic).c_str());
}
}
};
#endif

53
src/MqttPlugin.h Normal file
View File

@@ -0,0 +1,53 @@
#ifndef __MQTT_PLUGIN__
#define __MQTT_PLUGIN__
#define _TASK_SLEEP_ON_IDLE_RUN
#define _TASK_STD_FUNCTION
#include <WiFiClient.h>
#include <PubSubClient.h>
#include <Plugin.h>
#include "utils_print.h"
using namespace std;
using namespace std::placeholders;
struct MqttConfig
{
const char *clientName;
const char *brokerHost;
int brokerPort;
const char *inTopicRoot;
const char *outTopicRoot;
};
class MqttPlugin : public Plugin
{
public:
PubSubClient *client;
WiFiClient wifiClient;
Task connectTask;
Task processTask;
MqttConfig mqttConfig;
vector<String> topics;
MqttPlugin(MqttConfig cfg);
void activate(Scheduler *scheduler);
private:
void enableConnectTask(Scheduler *scheduler);
void enableProcessTask(Scheduler *scheduler);
void process();
void connect();
void mqttPublish(String msg);
void upstreamHandler(String topic, String msg);
void downstreamHandler(char *topic, uint8_t *payload, unsigned int length);
};
#endif

View File

@@ -1,7 +1,7 @@
#include "config.h" #include "config.h"
#include "WiFiNet.h" #include "WiFiNet.h"
#include "Sprocket.h" #include "Sprocket.h"
#include "MqttPlugin.cpp" #include "MqttPlugin.h"
WiFiNet *network; WiFiNet *network;
Sprocket *sprocket; Sprocket *sprocket;