diff --git a/platformio.ini b/platformio.ini
index 85b97a9..9def105 100644
--- a/platformio.ini
+++ b/platformio.ini
@@ -1,5 +1,5 @@
[platformio]
-env_default = basic
+env_default = basic, chat
[common]
framework = arduino
@@ -23,5 +23,15 @@ board = ${common.board}
upload_speed = ${common.upload_speed}
monitor_baud = ${common.monitor_baud}
framework = ${common.framework}
+lib_deps = ${common.lib_deps}
+ PubSubClient
+
+[env:chat]
+src_filter = +<*> - +
+platform = ${common.platform}
+board = ${common.board}
+upload_speed = ${common.upload_speed}
+monitor_baud = ${common.monitor_baud}
+framework = ${common.framework}
lib_deps = ${common.lib_deps}
PubSubClient
\ No newline at end of file
diff --git a/src/MqttConfig.h b/src/MqttConfig.h
index f079902..37f178a 100644
--- a/src/MqttConfig.h
+++ b/src/MqttConfig.h
@@ -6,7 +6,7 @@
#define JSON_MQTT_CLIENT_NAME "mqttClientName"
#define JSON_MQTT_BROKER_HOST "mqttBrokerHost"
#define JSON_MQTT_BROKER_PORT "mqttBrokerPort"
-#define JSON_MQTT_IN_TOPIC_ROOT "mqttInTopicRoot"
+#define JSON_MQTT_TOPIC_ROOT "mqttTopicRoot"
#define JSON_MQTT_OUT_TOPIC_ROOT "mqttOutTopicRoot"
struct MqttConfig
@@ -14,8 +14,7 @@ struct MqttConfig
const char *clientName;
const char *brokerHost;
int brokerPort;
- const char *inTopicRoot;
- const char *outTopicRoot;
+ const char *topicRoot;
};
struct MqttConfigJson : public MqttConfig, public JsonStruct
@@ -25,8 +24,7 @@ struct MqttConfigJson : public MqttConfig, public JsonStruct
root[JSON_MQTT_CLIENT_NAME] = clientName;
root[JSON_MQTT_BROKER_HOST] = brokerHost;
root[JSON_MQTT_BROKER_PORT] = brokerPort;
- root[JSON_MQTT_IN_TOPIC_ROOT] = inTopicRoot;
- root[JSON_MQTT_OUT_TOPIC_ROOT] = outTopicRoot;
+ root[JSON_MQTT_TOPIC_ROOT] = topicRoot;
}
void fromJsonObject(JsonObject &json)
{
@@ -39,8 +37,7 @@ struct MqttConfigJson : public MqttConfig, public JsonStruct
clientName = getAttr(json, JSON_MQTT_CLIENT_NAME);
brokerHost = getAttr(json, JSON_MQTT_BROKER_HOST);
brokerPort = getIntAttrFromJson(json, JSON_MQTT_BROKER_PORT);
- inTopicRoot = getAttr(json, JSON_MQTT_IN_TOPIC_ROOT);
- outTopicRoot = getAttr(json, JSON_MQTT_OUT_TOPIC_ROOT);
+ topicRoot = getAttr(json, JSON_MQTT_TOPIC_ROOT);
valid = 1;
};
};
diff --git a/src/MqttPlugin.cpp b/src/MqttPlugin.cpp
index 8f91f9a..1aa1a13 100644
--- a/src/MqttPlugin.cpp
+++ b/src/MqttPlugin.cpp
@@ -1,7 +1,9 @@
#include "MqttPlugin.h"
-MqttPlugin::MqttPlugin(MqttConfig cfg)
+MqttPlugin::MqttPlugin(MqttConfig cfg, bool bindUp, bool bindDown)
{
+ bindUpstream = bindUp;
+ bindDownstream = bindDown;
applyConfig(cfg);
}
@@ -9,8 +11,7 @@ void MqttPlugin::applyConfig(MqttConfig cfg) {
brokerHost = String(cfg.brokerHost);
brokerPort = cfg.brokerPort;
clientName = String(cfg.clientName);
- inTopicRoot = String(cfg.inTopicRoot);
- outTopicRoot = String(cfg.outTopicRoot);
+ topicRoot = String(cfg.topicRoot);
}
void MqttPlugin::applyConfigFromFile(const char* fileName) {
@@ -57,16 +58,22 @@ void MqttPlugin::connect()
{
for (auto const &localSub : mediator->subscriptions)
{
- // bind all local topics to the MQTT upstream once
- subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1));
- // subscribe topic on remote queue to dispatch messages to local subscriptions
- client->subscribe(
- (inTopicRoot + String(localSub.first.c_str()))
- .c_str());
+ if(bindUpstream){
+ // bind all local topics to the MQTT upstream once
+ subscribe(localSub.first.c_str(), bind(&MqttPlugin::upstreamHandler, this, localSub.first.c_str(), _1));
+ }
+ if(bindDownstream){
+ // subscribe topic on remote queue to dispatch messages to local subscriptions
+ client->subscribe(
+ (topicRoot + String("/in/") + String(localSub.first.c_str())).c_str());
+ }
}
subscribed = true;
}
+ publish("mqtt/connect", clientName);
PRINT_MSG(Serial, "MQTT", "connected");
+ } else {
+ PRINT_MSG(Serial, "MQTT", "connect failed");
}
}
}
@@ -74,10 +81,9 @@ void MqttPlugin::connect()
void MqttPlugin::upstreamHandler(String topic, String msg)
{
// publish message on remote queue
- String remoteTopic = outTopicRoot + topic;
- Serial.println(remoteTopic);
+ String remoteTopic = topicRoot + String("/out/") + topic;
client->publish(remoteTopic.c_str(), msg.c_str());
- PRINT_MSG(Serial, "MQTT", remoteTopic.c_str());
+ PRINT_MSG(Serial, "MQTT", String("pub: "+ remoteTopic).c_str());
}
void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int length)
@@ -88,10 +94,22 @@ void MqttPlugin::downstreamHandler(char *topic, uint8_t *payload, unsigned int l
String msg = String(cleanPayload);
free(cleanPayload);
- // substract the topic root and publish msg on local topic
- int topicRootLength = inTopicRoot.length();
- String localTopic = String(topic).substring(topicRootLength);
- publish(localTopic, msg);
+ // substract the topic root + "/in/" and publish msg on local topic
+ String topicStr = String(topic);
+ int topicRootLength = topicRoot.length();
+ if(topicStr.length() > topicRootLength){
+ String localTopic = topicStr.substring(topicRootLength);
+ if(localTopic.length() > 4){
+ String direction = localTopic.substring(0,4);
+ if(direction == "/in/" ){
+ String localSubTopic = localTopic.substring(direction.length());
+ publish(localSubTopic, msg);
+ PRINT_MSG(Serial, "MQTT", (String("publish /in/: ") + localSubTopic).c_str());
+ } else {
+ publish(localTopic, msg);
+ PRINT_MSG(Serial, "MQTT", (String("publish mediator: ") + localTopic).c_str());
+ }
+ }
+ }
- PRINT_MSG(Serial, "MQTT", (String("publish local: ") + localTopic).c_str());
}
diff --git a/src/MqttPlugin.h b/src/MqttPlugin.h
index b5d2ff5..37373d2 100644
--- a/src/MqttPlugin.h
+++ b/src/MqttPlugin.h
@@ -18,7 +18,7 @@ class MqttPlugin : public Plugin
public:
PubSubClient *client;
- MqttPlugin(MqttConfig cfg);
+ MqttPlugin(MqttConfig cfg, bool bindUp = true, bool bindDown = true);
/**
* Connects to queue and triggers connect and process task.
@@ -31,11 +31,12 @@ class MqttPlugin : public Plugin
Task connectTask;
Task processTask;
bool subscribed = false;
+ bool bindUpstream = false;
+ bool bindDownstream = false;
String brokerHost;
int brokerPort;
String clientName;
- String inTopicRoot;
- String outTopicRoot;
+ String topicRoot;
void applyConfig(MqttConfig cfg);
void applyConfigFromFile(const char* fileName);
@@ -46,20 +47,20 @@ class MqttPlugin : public Plugin
* to the corresponding topic on the remote queue by prefixing the topic with inRootTopic.
* Creates shadow subscriptions of every local topic to propagate messages to the remote queue via upstreamHandler.
*/
- void connect();
+ virtual void connect();
/**
* The upstream handler is bound to every local subscription.
* It passes the message to the remote queue by prefixing the outRootTopic.
*/
- void upstreamHandler(String topic, String msg);
+ virtual void upstreamHandler(String topic, String msg);
/**
* The downstream handler is bound to the remote counterpart of local subscriptions,
* prefixed with inRootTopic.
* Everything after the prefix is used as local topic and dispatched to local subscriptions.
*/
- void downstreamHandler(char *topic, uint8_t *payload, unsigned int length);
+ virtual void downstreamHandler(char *topic, uint8_t *payload, unsigned int length);
};
#endif
\ No newline at end of file
diff --git a/src/examples/chat/config.h b/src/examples/chat/config.h
new file mode 100644
index 0000000..b78a35b
--- /dev/null
+++ b/src/examples/chat/config.h
@@ -0,0 +1,38 @@
+#ifndef __DEVICE_CONFIG__
+#define __DEVICE_CONFIG__
+
+// Scheduler config
+#define _TASK_SLEEP_ON_IDLE_RUN
+#define _TASK_STD_FUNCTION
+#define _TASK_PRIORITY
+
+// Chip config
+#define SPROCKET_TYPE "SPROCKET"
+#define SERIAL_BAUD_RATE 115200
+#define STARTUP_DELAY 1000
+
+// network config
+#define SPROCKET_MODE 1
+#define WIFI_CHANNEL 11
+#define MESH_PORT 5555
+#define AP_SSID "sprocket"
+#define AP_PASSWORD "th3r31sn0sp00n"
+#define MESH_PREFIX "sprocket-mesh"
+#define MESH_PASSWORD "th3r31sn0sp00n"
+#define STATION_SSID "MyAP"
+#define STATION_PASSWORD "th3r31sn0sp00n"
+#define HOSTNAME "sprocket"
+#define CONNECT_TIMEOUT 10000
+#define MESH_DEBUG_TYPES ERROR | STARTUP | CONNECTION
+//#define MESH_DEBUG_TYPES ERROR | MESH_STATUS | CONNECTION | SYNC | COMMUNICATION | GENERAL | MSG_TYPES | REMOTE
+
+// WebServer
+#define WEB_CONTEXT_PATH "/"
+#define WEB_DOC_ROOT "/www"
+#define WEB_DEFAULT_FILE "index.html"
+#define WEB_PORT 80
+
+#define MQTT_CONFIG_FILE "/mqttConfig.json"
+
+
+#endif
\ No newline at end of file
diff --git a/src/examples/chat/main.cpp b/src/examples/chat/main.cpp
new file mode 100644
index 0000000..3bd4d1d
--- /dev/null
+++ b/src/examples/chat/main.cpp
@@ -0,0 +1,62 @@
+#include "config.h"
+#include "WiFiNet.h"
+#include "Sprocket.h"
+#include "MqttPlugin.h"
+
+WiFiNet *network;
+Sprocket *sprocket;
+MqttPlugin *mqttPlugin;
+
+Task publishHeap;
+
+void setup()
+{
+
+ sprocket = new Sprocket({STARTUP_DELAY, SERIAL_BAUD_RATE});
+ mqttPlugin = new MqttPlugin({"chatSprocket",
+ "192.168.1.2",
+ 1883,
+ "wirelos/sprocket"},
+ true, true);
+ sprocket->addPlugin(mqttPlugin);
+
+ // this subscription gets shadowed by the MqttPlugin
+ // to rout messages to the queue on topic wirelos/sprocket/chat/log
+ sprocket->subscribe("chat/log", [](String msg) {
+ // gets routed to MQTT broker
+ PRINT_MSG(Serial, "CHAT", msg.c_str());
+ });
+ publishHeap.set(TASK_SECOND * 5, TASK_FOREVER, []() {
+ // locally publish a message
+ sprocket->publish("chat/log", "hi there");
+ });
+ sprocket->addTask(publishHeap);
+
+ network = new WiFiNet(
+ SPROCKET_MODE,
+ STATION_SSID,
+ STATION_PASSWORD,
+ AP_SSID,
+ AP_PASSWORD,
+ HOSTNAME,
+ CONNECT_TIMEOUT);
+ network->connect();
+
+ sprocket->activate();
+
+ sprocket->subscribe("mqtt/connect", [](String msg) {
+ if (msg.length() > 0)
+ {
+ mqttPlugin->client->subscribe("wirelos/sprocket/out/chat/log");
+ sprocket->subscribe("/out/chat/log", [](String msg){
+ PRINT_MSG(Serial, "CHAT", String("incoming: " +msg).c_str());
+ });
+ }
+ });
+}
+
+void loop()
+{
+ sprocket->loop();
+ yield();
+}
\ No newline at end of file