basic mqtt plugin

This commit is contained in:
2018-11-15 11:36:19 +01:00
commit accd942c34
10 changed files with 489 additions and 0 deletions

150
src/MqttPlugin.cpp Normal file
View File

@@ -0,0 +1,150 @@
#ifndef __MQTT_PLUGIN__
#define __MQTT_PLUGIN__
#define _TASK_SLEEP_ON_IDLE_RUN
#define _TASK_STD_FUNCTION
#include <WiFiClient.h>
#include <PubSubClient.h>
#include <Plugin.h>
using namespace std;
using namespace std::placeholders;
struct MqttConfig
{
const char *clientName;
const char *brokerHost;
int brokerPort;
const char *inTopicRoot;
const char *outTopicRoot;
};
class MqttPlugin : public Plugin
{
public:
PubSubClient *client;
WiFiClient wifiClient;
Task connectTask;
Task processTask;
MqttConfig mqttConfig;
vector<String> topics;
MqttPlugin(MqttConfig cfg)
{
mqttConfig = cfg;
//subscribe("mqtt/out", bind(&MqttPlugin::mqttPublish, this, _1));
}
void activate(Scheduler *scheduler)
{
client = new PubSubClient(mqttConfig.brokerHost, mqttConfig.brokerPort, bind(&MqttPlugin::downstreamHandler, this, _1, _2, _3), wifiClient);
enableConnectTask(scheduler);
enableProcessTask(scheduler);
Serial.println("MQTT plugin activated"); // FIXME use PRINT_MSG
}
private:
void enableConnectTask(Scheduler *scheduler)
{
connectTask.set(TASK_SECOND * 5, TASK_FOREVER, bind(&MqttPlugin::connect, this));
scheduler->addTask(connectTask);
connectTask.enable();
}
void enableProcessTask(Scheduler *scheduler)
{
processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&MqttPlugin::process, this));
scheduler->addTask(processTask);
processTask.enable();
}
void process()
{
client->loop();
}
void connect()
{
if (!client->connected())
{
Serial.println("MQTT try connect");
if (client->connect(mqttConfig.clientName))
{
Serial.println("MQTT connected");
upstreamHandler("lobby", "join");
// overkill to subscribe to all...?
for (auto const &localSub : mediator->subscriptions)
{
subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1));
client->subscribe(
(String(mqttConfig.inTopicRoot) + String(localSub.first.c_str()))
.c_str());
}
}
}
else
{
publish("local/topic", "Free heap: " + String(ESP.getFreeHeap()));
}
}
void mqttPublish(String msg)
{
client->publish(mqttConfig.outTopicRoot, msg.c_str());
}
void upstreamHandler(String topic, String msg)
{
Serial.println(msg);
// publish message on remote queue
// TODO rootTopic + topic?
client->publish((String(mqttConfig.outTopicRoot) + topic).c_str(), msg.c_str());
}
void downstreamHandler(char *topic, uint8_t *payload, unsigned int length)
{
char *cleanPayload = (char *)malloc(length + 1);
payload[length] = '\0';
memcpy(cleanPayload, payload, length + 1);
String msg = String(cleanPayload);
free(cleanPayload);
Serial.println("MQTT topic " + String(topic));
Serial.println("MQTT msg " + msg);
int topicRootLength = String(mqttConfig.inTopicRoot).length();
String localTopic = String(topic).substring(topicRootLength);
Serial.println("Local topic:" + localTopic);
// publish message to local subscribers on device
publish(localTopic, msg);
}
/* void mqttCallback(char* topic, uint8_t* payload, unsigned int length) {
char* cleanPayload = (char*)malloc(length+1);
payload[length] = '\0';
memcpy(cleanPayload, payload, length+1);
String msg = String(cleanPayload);
free(cleanPayload);
int topicRootLength = String(mqttConfig.topicRoot).length();
String targetStr = String(topic).substring(topicRootLength + 4);
if(targetStr == "gateway"){
if(msg == "getNodes") {
client->publish(MQTT_TOPIC_FROM_GATEWAY, net->mesh.subConnectionJson().c_str());
}
} else if(targetStr == "broadcast") {
net->mesh.sendBroadcast(msg);
} else {
uint32_t target = strtoul(targetStr.c_str(), NULL, 10);
if(net->mesh.isConnected(target)){
net->mesh.sendSingle(target, msg);
} else {
client->publish(MQTT_TOPIC_FROM_GATEWAY, "Client not connected!");
}
}
} */
};
#endif