hardcode in/out topics, add chat example

This commit is contained in:
2018-11-19 20:31:34 +01:00
parent e1d605c615
commit a195307fc1
6 changed files with 157 additions and 31 deletions

View File

@@ -1,7 +1,9 @@
#include "MqttPlugin.h"
MqttPlugin::MqttPlugin(MqttConfig cfg)
MqttPlugin::MqttPlugin(MqttConfig cfg, bool bindUp, bool bindDown)
{
bindUpstream = bindUp;
bindDownstream = bindDown;
applyConfig(cfg);
}
@@ -9,8 +11,7 @@ void MqttPlugin::applyConfig(MqttConfig cfg) {
brokerHost = String(cfg.brokerHost);
brokerPort = cfg.brokerPort;
clientName = String(cfg.clientName);
inTopicRoot = String(cfg.inTopicRoot);
outTopicRoot = String(cfg.outTopicRoot);
topicRoot = String(cfg.topicRoot);
}
void MqttPlugin::applyConfigFromFile(const char* fileName) {
@@ -57,16 +58,22 @@ void MqttPlugin::connect()
{
for (auto const &localSub : mediator->subscriptions)
{
// bind all local topics to the MQTT upstream once
subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1));
// subscribe topic on remote queue to dispatch messages to local subscriptions
client->subscribe(
(inTopicRoot + String(localSub.first.c_str()))
.c_str());
if(bindUpstream){
// bind all local topics to the MQTT upstream once
subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1));
}
if(bindDownstream){
// subscribe topic on remote queue to dispatch messages to local subscriptions
client->subscribe(
(topicRoot + String("/in/") + String(localSub.first.c_str())).c_str());
}
}
subscribed = true;
}
publish("mqtt/connect", clientName);
PRINT_MSG(Serial, "MQTT", "connected");
} else {
PRINT_MSG(Serial, "MQTT", "connect failed");
}
}
}
@@ -74,10 +81,9 @@ void MqttPlugin::connect()
void MqttPlugin::upstreamHandler(String topic, String msg)
{
// publish message on remote queue
String remoteTopic = outTopicRoot + topic;
Serial.println(remoteTopic);
String remoteTopic = topicRoot + String("/out/") + topic;
client->publish(remoteTopic.c_str(), msg.c_str());
PRINT_MSG(Serial, "MQTT", remoteTopic.c_str());
PRINT_MSG(Serial, "MQTT", String("pub: "+ remoteTopic).c_str());
}
void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int length)
@@ -88,10 +94,22 @@ void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int l
String msg = String(cleanPayload);
free(cleanPayload);
// substract the topic root and publish msg on local topic
int topicRootLength = inTopicRoot.length();
String localTopic = String(topic).substring(topicRootLength);
publish(localTopic, msg);
// substract the topic root + "/in/" and publish msg on local topic
String topicStr = String(topic);
int topicRootLength = topicRoot.length();
if(topicStr.length() > topicRootLength){
String localTopic = topicStr.substring(topicRootLength);
if(localTopic.length() > 4){
String direction = localTopic.substring(0,4);
if(direction == "/in/" ){
String localSubTopic = localTopic.substring(direction.length());
publish(localSubTopic, msg);
PRINT_MSG(Serial, "MQTT", (String("publish /in/: ") + localSubTopic).c_str());
} else {
publish(localTopic, msg);
PRINT_MSG(Serial, "MQTT", (String("publish mediator: ") + localTopic).c_str());
}
}
}
PRINT_MSG(Serial, "MQTT", (String("publish local: ") + localTopic).c_str());
}