#include "MqttPlugin.h" MqttPlugin::MqttPlugin(MqttConfig cfg, bool bindUp, bool bindDown) { bindUpstream = bindUp; bindDownstream = bindDown; applyConfig(cfg); } void MqttPlugin::applyConfig(MqttConfig cfg) { brokerHost = String(cfg.brokerHost); brokerPort = cfg.brokerPort; clientName = String(cfg.clientName); topicRoot = String(cfg.topicRoot); } void MqttPlugin::applyConfigFromFile(const char* fileName) { MqttConfigJson configFile; configFile.fromFile(fileName); if(configFile.valid){ PRINT_MSG(Serial, "MQTT", "apply config from file"); applyConfig(configFile); } } void MqttPlugin::activate(Scheduler *scheduler) { applyConfigFromFile("/mqttConfig.json"); client = new PubSubClient(brokerHost.c_str(), brokerPort, bind(&MqttPlugin::downstreamHandler, this, _1, _2, _3), wifiClient); enableConnectTask(scheduler); enableProcessTask(scheduler); PRINT_MSG(Serial, "MQTT", "plugin activated"); } void MqttPlugin::enableConnectTask(Scheduler *scheduler) { connectTask.set(TASK_SECOND * 5, TASK_FOREVER, bind(&MqttPlugin::connect, this)); scheduler->addTask(connectTask); connectTask.enable(); } void MqttPlugin::enableProcessTask(Scheduler *scheduler) { processTask.set(TASK_MILLISECOND * 5, TASK_FOREVER, bind(&PubSubClient::loop, client)); scheduler->addTask(processTask); processTask.enable(); } void MqttPlugin::connect() { if (!client->connected()) { PRINT_MSG(Serial, "MQTT", "try connect"); if (client->connect(clientName.c_str())) { // bind handlers to all local subscriptions if (!subscribed) { for (auto const &localSub : mediator->subscriptions) { if(bindUpstream){ // bind all local topics to the MQTT upstream once subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1)); } if(bindDownstream){ // subscribe topic on remote queue to dispatch messages to local subscriptions client->subscribe( (topicRoot + String("/in/") + String(localSub.first.c_str())).c_str()); } } subscribed = true; } publish("mqtt/connect", clientName); PRINT_MSG(Serial, "MQTT", "connected"); } else { PRINT_MSG(Serial, "MQTT", "connect failed"); } } } void MqttPlugin::upstreamHandler(String topic, String msg) { // publish message on remote queue String remoteTopic = topicRoot + String("/out/") + topic; client->publish(remoteTopic.c_str(), msg.c_str()); PRINT_MSG(Serial, "MQTT", String("pub: "+ remoteTopic).c_str()); } void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int length) { char *cleanPayload = (char *)malloc(length + 1); payload[length] = '\0'; memcpy(cleanPayload, payload, length + 1); String msg = String(cleanPayload); free(cleanPayload); // substract the topic root + "/in/" and publish msg on local topic String topicStr = String(topic); int topicRootLength = topicRoot.length(); if(topicStr.length() > topicRootLength){ String baseTopic = topicStr.substring(0, topicRootLength); if(baseTopic.compareTo(topicRoot) == 0){ String localTopic = topicStr.substring(topicRootLength); String direction = localTopic.substring(0,4); if(direction == "/in/" ){ String localSubTopic = localTopic.substring(direction.length()); publish(localSubTopic, msg); PRINT_MSG(Serial, "MQTT", (String("publish /in/: ") + localSubTopic).c_str()); } else { publish(localTopic, msg); PRINT_MSG(Serial, "MQTT", (String("publish mediator: ") + localTopic).c_str()); } return; } } publish(topicStr, msg); PRINT_MSG(Serial, "MQTT", (String("publish default: ") + topicStr).c_str()); }