diff --git a/src/Mediator.h b/src/Mediator.h index 4134320..88f6d68 100644 --- a/src/Mediator.h +++ b/src/Mediator.h @@ -10,18 +10,18 @@ using namespace std; -typedef std::function mediatorHandler_t; +typedef std::function subscriptionHandler_t; class Mediator { public: - std::map> subscriptions; - void subscribe(String topic, mediatorHandler_t handler) { + std::map> subscriptions; + void subscribe(String topic, subscriptionHandler_t handler) { subscriptions[topic.c_str()].reserve(1); subscriptions[topic.c_str()].push_back(handler); } void publish(String topic, String msg) { if (subscriptions.find(topic.c_str()) != subscriptions.end()){ - for(mediatorHandler_t h : subscriptions[topic.c_str()]){ + for(subscriptionHandler_t h : subscriptions[topic.c_str()]){ h(msg); } } diff --git a/src/Plugin.h b/src/Plugin.h index ed7b1d7..bce5744 100644 --- a/src/Plugin.h +++ b/src/Plugin.h @@ -4,13 +4,25 @@ #include #include #include +#include class Plugin { public: + Mediator* mediator; virtual void activate(Scheduler*, Network*); virtual void enable(){}; virtual void disable(){}; virtual void onMessage(SprocketMessage msg){}; + Plugin* mediate(Mediator* m) { + mediator = m; + return this; + } + void subscribe(String topic, subscriptionHandler_t handler){ + mediator->subscribe(topic, handler); + } + void publish(String topic, String str){ + mediator->publish(topic, str); + } }; #endif \ No newline at end of file diff --git a/src/Sprocket.cpp b/src/Sprocket.cpp index 3e0593f..cd1b84d 100644 --- a/src/Sprocket.cpp +++ b/src/Sprocket.cpp @@ -44,12 +44,14 @@ void Sprocket::loop(){ void Sprocket::dispatch( uint32_t from, String &msg ) { SprocketMessage mMsg; - if(mMsg.fromJsonString(msg)){ + mMsg.fromJsonString(msg); + if(mMsg.valid){ dispatchMessageToPlugins(mMsg); } } void Sprocket::addPlugin(Plugin* p){ + p->mediate(this); plugins.reserve(1); plugins.push_back(p); } @@ -60,10 +62,11 @@ void Sprocket::activatePlugins(Scheduler* scheduler, Network* network){ } } +// TODO remove plugin dispatching and onMessage in favor of mediator pub/sub void Sprocket::dispatchMessageToPlugins(SprocketMessage msg){ if(msg.type != SprocketMessage::NONE){ // baaaa for(Plugin* p : plugins){ - Serial.println("dispatch to plugins"); + //Serial.println("dispatch to plugins"); p->onMessage(msg); } } diff --git a/src/Sprocket.h b/src/Sprocket.h index 32493da..d13b4b7 100644 --- a/src/Sprocket.h +++ b/src/Sprocket.h @@ -18,7 +18,7 @@ using namespace std::placeholders; // FIXME move to some global fnc lib #define ARRAY_LENGTH(array) sizeof(array)/sizeof(array[0]) -class Sprocket { +class Sprocket : public Mediator { protected: Scheduler scheduler; public: diff --git a/src/base/MeshSprocket.h b/src/base/MeshSprocket.h index 66a93e0..fa6c55a 100644 --- a/src/base/MeshSprocket.h +++ b/src/base/MeshSprocket.h @@ -17,6 +17,7 @@ using namespace std::placeholders; class MeshSprocket : public Sprocket { public: MeshNet* net; + MeshSprocket(){}; MeshSprocket(SprocketConfig cfg) : Sprocket(cfg) { @@ -31,7 +32,11 @@ class MeshSprocket : public Sprocket { } using Sprocket::activate; virtual void onMessage(uint32_t from, String &msg) { - Serial.printf("MeshSprocket onMessage: received from %u msg=%s\n", from, msg.c_str()); + //Serial.printf("MeshSprocket onMessage: received from %u msg=%s\n", from, msg.c_str()); + SprocketMessage sMsg; + sMsg.fromJsonString(msg); + sMsg.from = from; + publish(sMsg.topic, sMsg.payload); }; void dispatch( uint32_t from, String &msg ) { diff --git a/src/base/SprocketMessage.h b/src/base/SprocketMessage.h index 587b4cf..e4ebeee 100644 --- a/src/base/SprocketMessage.h +++ b/src/base/SprocketMessage.h @@ -3,78 +3,56 @@ #include #include +#include #define JSON_DOMAIN "domain" #define JSON_FROM "from" #define JSON_TO "target" -#define JSON_MSG "msg" +#define JSON_PAYLOAD "payload" #define JSON_TYPE "type" +#define JSON_TOPIC "topic" -struct SprocketMessage { +struct SprocketMessage : public JsonStruct { String domain; String to; String from; - String msg; + String payload; + String topic; + + // TODO do we even need that? enum SprocketMessageType { NONE, SYSTEM, APP, OTA } type; - int valid = 0; // ------------------------------------------------------------------------------------------ - //void init() { - // from = reinterpret_cast(ESP.getChipId()); - //} int verifyJsonObject(JsonObject& json){ - return json.success(); + return json.success() //&& json.containsKey(JSON_DOMAIN) //&& json.containsKey(JSON_TO) //&& json.containsKey(JSON_FROM) //&& json.containsKey(JSON_TYPE) - // && json.containsKey(JSON_MSG); // msg is only tx'ed from mqtt + && json.containsKey(JSON_TOPIC) + && json.containsKey(JSON_PAYLOAD); }; - String toJsonString(){ - //StaticJsonBuffer<200> jsonBuffer; - DynamicJsonBuffer jsonBuffer(JSON_ARRAY_SIZE(300)); - JsonObject& root = jsonBuffer.createObject(); + void mapJsonObject(JsonObject& root){ root[JSON_DOMAIN] = domain; root[JSON_TO] = to; root[JSON_FROM] = from; - root[JSON_MSG] = msg; + root[JSON_PAYLOAD] = payload; + root[JSON_TOPIC] = topic; root[JSON_TYPE] = type; - String jsonString; - root.printTo(jsonString); - return jsonString; - } - String getAttrFromJson(JsonObject& json, const char* attr){ - if(json.containsKey(attr)){ - return json[attr]; - } - return ""; - } - int getIntAttrFromJson(JsonObject& json, const char* attr){ - if(json.containsKey(attr)){ - return json[attr]; - } - return 0; } // Map a json object to this struct. - int fromJsonObject(JsonObject& json){ + void fromJsonObject(JsonObject& json){ if(!verifyJsonObject(json)){ Serial.println("ERROR: cannot parse SprocketMessage JSON object"); valid = 0; - return valid; + return; } domain = getAttrFromJson(json, JSON_DOMAIN); to = getAttrFromJson(json, JSON_TO); from = getAttrFromJson(json, JSON_FROM); - msg = getAttrFromJson(json, JSON_MSG); + payload = getAttrFromJson(json, JSON_PAYLOAD); + topic = getAttrFromJson(json, JSON_TOPIC); type = (SprocketMessageType) getIntAttrFromJson(json, JSON_TYPE); valid = 1; - return valid; - }; - // Parse a json string and map parsed object - int fromJsonString(String& str){ - //StaticJsonBuffer<200> jsonBuffer; - DynamicJsonBuffer jsonBuffer(JSON_ARRAY_SIZE(300)); - JsonObject& json = jsonBuffer.parseObject(str); - return fromJsonObject(json); }; }; diff --git a/src/examples/mesh/MeshApp.h b/src/examples/mesh/MeshApp.h index 79f0d18..d7cbec7 100644 --- a/src/examples/mesh/MeshApp.h +++ b/src/examples/mesh/MeshApp.h @@ -19,14 +19,13 @@ AsyncWebServer WEBSERVER(80); class MeshApp : public MeshSprocket { public: Task heartbeatTask; - Mediator mediator; MeshApp(SprocketConfig cfg, OtaConfig otaCfg, WebServerConfig webCfg) : MeshSprocket(cfg) { addPlugin(new OtaTcpPlugin(otaCfg)); addPlugin(new WebServerPlugin(webCfg, &WEBSERVER)); addPlugin(new WebConfigPlugin(&WEBSERVER)); addPlugin(new MeshManPlugin(&WEBSERVER)); - mediator.subscribe("mediatorMsg", bind(&MeshApp::messageHandler, this, _1)); + subscribe("mesh/heartbeat", bind(&MeshApp::messageHandler, this, _1)); } Sprocket* activate(Scheduler* scheduler, Network* network) { @@ -41,23 +40,24 @@ class MeshApp : public MeshSprocket { } using MeshSprocket::activate; void messageHandler(String msg){ - Serial.println(msg); + Serial.println(String("MeshApp: ") + msg); } void heartbeat(MeshNet* network){ SprocketMessage msg; // = { "wirelos", "broadcast", "local", "alive", 0, }; msg.domain = "wirelos"; msg.to = "broadcast"; - msg.msg = "alive"; + msg.payload = "alive"; + msg.topic = "mesh/heartbeat"; msg.type = SprocketMessage::APP; String msgStr = msg.toJsonString(); - network->mesh.sendBroadcast(msgStr/* , true */); + network->mesh.sendBroadcast(msgStr, true); //String mMsg = String("hoi"); - mediator.publish("mediatorMsg", "hi mediator"); - } - void onMessage( uint32_t from, String &msg ) { - Serial.printf("MeshApp onMessage: received from %u msg=%s\n", from, msg.c_str()); + //publish("mediatorMsg", "hi mediator"); } + //void onMessage( uint32_t from, String &msg ) { + // Serial.printf("MeshApp onMessage: received from %u msg=%s\n", from, msg.c_str()); + //} }; #endif \ No newline at end of file diff --git a/src/plugins/MeshManPlugin.cpp b/src/plugins/MeshManPlugin.cpp index 8b7b1f3..d9dc8a2 100644 --- a/src/plugins/MeshManPlugin.cpp +++ b/src/plugins/MeshManPlugin.cpp @@ -27,6 +27,11 @@ class MeshManPlugin : public Plugin { server->on("/mesh", HTTP_POST, std::bind(&MeshManPlugin::sendMsg, this, std::placeholders::_1)); server->on("/mesh/nodeId", HTTP_GET, std::bind(&MeshManPlugin::getNodeId, this, std::placeholders::_1)); server->on("/mesh/broadcast", HTTP_POST, std::bind(&MeshManPlugin::broadcast, this, std::placeholders::_1)); + + subscribe("mesh/heartbeat", std::bind(&MeshManPlugin::gotHeartbeat, this, std::placeholders::_1)); + } + void gotHeartbeat(String msg){ + Serial.println(String("Got heartbeat: ") + msg); } void getMeshConnections(AsyncWebServerRequest *request) { request->send(200, "text/plain", net->mesh.subConnectionJson());