use mediator for internal messaging

This commit is contained in:
2018-09-10 01:46:32 +02:00
parent 622074df2c
commit ec6c41f4a1
8 changed files with 60 additions and 57 deletions

View File

@@ -10,18 +10,18 @@
using namespace std; using namespace std;
typedef std::function<void(String msg)> mediatorHandler_t; typedef std::function<void(String msg)> subscriptionHandler_t;
class Mediator { class Mediator {
public: public:
std::map<std::string, vector<mediatorHandler_t>> subscriptions; std::map<std::string, vector<subscriptionHandler_t>> subscriptions;
void subscribe(String topic, mediatorHandler_t handler) { void subscribe(String topic, subscriptionHandler_t handler) {
subscriptions[topic.c_str()].reserve(1); subscriptions[topic.c_str()].reserve(1);
subscriptions[topic.c_str()].push_back(handler); subscriptions[topic.c_str()].push_back(handler);
} }
void publish(String topic, String msg) { void publish(String topic, String msg) {
if (subscriptions.find(topic.c_str()) != subscriptions.end()){ if (subscriptions.find(topic.c_str()) != subscriptions.end()){
for(mediatorHandler_t h : subscriptions[topic.c_str()]){ for(subscriptionHandler_t h : subscriptions[topic.c_str()]){
h(msg); h(msg);
} }
} }

View File

@@ -4,13 +4,25 @@
#include <TaskSchedulerDeclarations.h> #include <TaskSchedulerDeclarations.h>
#include <Network.h> #include <Network.h>
#include <base/SprocketMessage.h> #include <base/SprocketMessage.h>
#include <Mediator.h>
class Plugin { class Plugin {
public: public:
Mediator* mediator;
virtual void activate(Scheduler*, Network*); virtual void activate(Scheduler*, Network*);
virtual void enable(){}; virtual void enable(){};
virtual void disable(){}; virtual void disable(){};
virtual void onMessage(SprocketMessage msg){}; virtual void onMessage(SprocketMessage msg){};
Plugin* mediate(Mediator* m) {
mediator = m;
return this;
}
void subscribe(String topic, subscriptionHandler_t handler){
mediator->subscribe(topic, handler);
}
void publish(String topic, String str){
mediator->publish(topic, str);
}
}; };
#endif #endif

View File

@@ -44,12 +44,14 @@ void Sprocket::loop(){
void Sprocket::dispatch( uint32_t from, String &msg ) { void Sprocket::dispatch( uint32_t from, String &msg ) {
SprocketMessage mMsg; SprocketMessage mMsg;
if(mMsg.fromJsonString(msg)){ mMsg.fromJsonString(msg);
if(mMsg.valid){
dispatchMessageToPlugins(mMsg); dispatchMessageToPlugins(mMsg);
} }
} }
void Sprocket::addPlugin(Plugin* p){ void Sprocket::addPlugin(Plugin* p){
p->mediate(this);
plugins.reserve(1); plugins.reserve(1);
plugins.push_back(p); plugins.push_back(p);
} }
@@ -60,10 +62,11 @@ void Sprocket::activatePlugins(Scheduler* scheduler, Network* network){
} }
} }
// TODO remove plugin dispatching and onMessage in favor of mediator pub/sub
void Sprocket::dispatchMessageToPlugins(SprocketMessage msg){ void Sprocket::dispatchMessageToPlugins(SprocketMessage msg){
if(msg.type != SprocketMessage::NONE){ // baaaa if(msg.type != SprocketMessage::NONE){ // baaaa
for(Plugin* p : plugins){ for(Plugin* p : plugins){
Serial.println("dispatch to plugins"); //Serial.println("dispatch to plugins");
p->onMessage(msg); p->onMessage(msg);
} }
} }

View File

@@ -18,7 +18,7 @@ using namespace std::placeholders;
// FIXME move to some global fnc lib // FIXME move to some global fnc lib
#define ARRAY_LENGTH(array) sizeof(array)/sizeof(array[0]) #define ARRAY_LENGTH(array) sizeof(array)/sizeof(array[0])
class Sprocket { class Sprocket : public Mediator {
protected: protected:
Scheduler scheduler; Scheduler scheduler;
public: public:

View File

@@ -17,6 +17,7 @@ using namespace std::placeholders;
class MeshSprocket : public Sprocket { class MeshSprocket : public Sprocket {
public: public:
MeshNet* net; MeshNet* net;
MeshSprocket(){}; MeshSprocket(){};
MeshSprocket(SprocketConfig cfg) : Sprocket(cfg) { MeshSprocket(SprocketConfig cfg) : Sprocket(cfg) {
@@ -31,7 +32,11 @@ class MeshSprocket : public Sprocket {
} using Sprocket::activate; } using Sprocket::activate;
virtual void onMessage(uint32_t from, String &msg) { virtual void onMessage(uint32_t from, String &msg) {
Serial.printf("MeshSprocket onMessage: received from %u msg=%s\n", from, msg.c_str()); //Serial.printf("MeshSprocket onMessage: received from %u msg=%s\n", from, msg.c_str());
SprocketMessage sMsg;
sMsg.fromJsonString(msg);
sMsg.from = from;
publish(sMsg.topic, sMsg.payload);
}; };
void dispatch( uint32_t from, String &msg ) { void dispatch( uint32_t from, String &msg ) {

View File

@@ -3,78 +3,56 @@
#include <Arduino.h> #include <Arduino.h>
#include <ArduinoJson.h> #include <ArduinoJson.h>
#include <JsonStruct.h>
#define JSON_DOMAIN "domain" #define JSON_DOMAIN "domain"
#define JSON_FROM "from" #define JSON_FROM "from"
#define JSON_TO "target" #define JSON_TO "target"
#define JSON_MSG "msg" #define JSON_PAYLOAD "payload"
#define JSON_TYPE "type" #define JSON_TYPE "type"
#define JSON_TOPIC "topic"
struct SprocketMessage { struct SprocketMessage : public JsonStruct {
String domain; String domain;
String to; String to;
String from; String from;
String msg; String payload;
String topic;
// TODO do we even need that?
enum SprocketMessageType { NONE, SYSTEM, APP, OTA } type; enum SprocketMessageType { NONE, SYSTEM, APP, OTA } type;
int valid = 0;
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
//void init() {
// from = reinterpret_cast<char*>(ESP.getChipId());
//}
int verifyJsonObject(JsonObject& json){ int verifyJsonObject(JsonObject& json){
return json.success(); return json.success()
//&& json.containsKey(JSON_DOMAIN) //&& json.containsKey(JSON_DOMAIN)
//&& json.containsKey(JSON_TO) //&& json.containsKey(JSON_TO)
//&& json.containsKey(JSON_FROM) //&& json.containsKey(JSON_FROM)
//&& json.containsKey(JSON_TYPE) //&& json.containsKey(JSON_TYPE)
// && json.containsKey(JSON_MSG); // msg is only tx'ed from mqtt && json.containsKey(JSON_TOPIC)
&& json.containsKey(JSON_PAYLOAD);
}; };
String toJsonString(){ void mapJsonObject(JsonObject& root){
//StaticJsonBuffer<200> jsonBuffer;
DynamicJsonBuffer jsonBuffer(JSON_ARRAY_SIZE(300));
JsonObject& root = jsonBuffer.createObject();
root[JSON_DOMAIN] = domain; root[JSON_DOMAIN] = domain;
root[JSON_TO] = to; root[JSON_TO] = to;
root[JSON_FROM] = from; root[JSON_FROM] = from;
root[JSON_MSG] = msg; root[JSON_PAYLOAD] = payload;
root[JSON_TOPIC] = topic;
root[JSON_TYPE] = type; root[JSON_TYPE] = type;
String jsonString;
root.printTo(jsonString);
return jsonString;
}
String getAttrFromJson(JsonObject& json, const char* attr){
if(json.containsKey(attr)){
return json[attr];
}
return "";
}
int getIntAttrFromJson(JsonObject& json, const char* attr){
if(json.containsKey(attr)){
return json[attr];
}
return 0;
} }
// Map a json object to this struct. // Map a json object to this struct.
int fromJsonObject(JsonObject& json){ void fromJsonObject(JsonObject& json){
if(!verifyJsonObject(json)){ if(!verifyJsonObject(json)){
Serial.println("ERROR: cannot parse SprocketMessage JSON object"); Serial.println("ERROR: cannot parse SprocketMessage JSON object");
valid = 0; valid = 0;
return valid; return;
} }
domain = getAttrFromJson(json, JSON_DOMAIN); domain = getAttrFromJson(json, JSON_DOMAIN);
to = getAttrFromJson(json, JSON_TO); to = getAttrFromJson(json, JSON_TO);
from = getAttrFromJson(json, JSON_FROM); from = getAttrFromJson(json, JSON_FROM);
msg = getAttrFromJson(json, JSON_MSG); payload = getAttrFromJson(json, JSON_PAYLOAD);
topic = getAttrFromJson(json, JSON_TOPIC);
type = (SprocketMessageType) getIntAttrFromJson(json, JSON_TYPE); type = (SprocketMessageType) getIntAttrFromJson(json, JSON_TYPE);
valid = 1; valid = 1;
return valid;
};
// Parse a json string and map parsed object
int fromJsonString(String& str){
//StaticJsonBuffer<200> jsonBuffer;
DynamicJsonBuffer jsonBuffer(JSON_ARRAY_SIZE(300));
JsonObject& json = jsonBuffer.parseObject(str);
return fromJsonObject(json);
}; };
}; };

View File

@@ -19,14 +19,13 @@ AsyncWebServer WEBSERVER(80);
class MeshApp : public MeshSprocket { class MeshApp : public MeshSprocket {
public: public:
Task heartbeatTask; Task heartbeatTask;
Mediator mediator;
MeshApp(SprocketConfig cfg, OtaConfig otaCfg, WebServerConfig webCfg) : MeshSprocket(cfg) { MeshApp(SprocketConfig cfg, OtaConfig otaCfg, WebServerConfig webCfg) : MeshSprocket(cfg) {
addPlugin(new OtaTcpPlugin(otaCfg)); addPlugin(new OtaTcpPlugin(otaCfg));
addPlugin(new WebServerPlugin(webCfg, &WEBSERVER)); addPlugin(new WebServerPlugin(webCfg, &WEBSERVER));
addPlugin(new WebConfigPlugin(&WEBSERVER)); addPlugin(new WebConfigPlugin(&WEBSERVER));
addPlugin(new MeshManPlugin(&WEBSERVER)); addPlugin(new MeshManPlugin(&WEBSERVER));
mediator.subscribe("mediatorMsg", bind(&MeshApp::messageHandler, this, _1)); subscribe("mesh/heartbeat", bind(&MeshApp::messageHandler, this, _1));
} }
Sprocket* activate(Scheduler* scheduler, Network* network) { Sprocket* activate(Scheduler* scheduler, Network* network) {
@@ -41,23 +40,24 @@ class MeshApp : public MeshSprocket {
} using MeshSprocket::activate; } using MeshSprocket::activate;
void messageHandler(String msg){ void messageHandler(String msg){
Serial.println(msg); Serial.println(String("MeshApp: ") + msg);
} }
void heartbeat(MeshNet* network){ void heartbeat(MeshNet* network){
SprocketMessage msg; // = { "wirelos", "broadcast", "local", "alive", 0, }; SprocketMessage msg; // = { "wirelos", "broadcast", "local", "alive", 0, };
msg.domain = "wirelos"; msg.domain = "wirelos";
msg.to = "broadcast"; msg.to = "broadcast";
msg.msg = "alive"; msg.payload = "alive";
msg.topic = "mesh/heartbeat";
msg.type = SprocketMessage::APP; msg.type = SprocketMessage::APP;
String msgStr = msg.toJsonString(); String msgStr = msg.toJsonString();
network->mesh.sendBroadcast(msgStr/* , true */); network->mesh.sendBroadcast(msgStr, true);
//String mMsg = String("hoi"); //String mMsg = String("hoi");
mediator.publish("mediatorMsg", "hi mediator"); //publish("mediatorMsg", "hi mediator");
}
void onMessage( uint32_t from, String &msg ) {
Serial.printf("MeshApp onMessage: received from %u msg=%s\n", from, msg.c_str());
} }
//void onMessage( uint32_t from, String &msg ) {
// Serial.printf("MeshApp onMessage: received from %u msg=%s\n", from, msg.c_str());
//}
}; };
#endif #endif

View File

@@ -27,6 +27,11 @@ class MeshManPlugin : public Plugin {
server->on("/mesh", HTTP_POST, std::bind(&MeshManPlugin::sendMsg, this, std::placeholders::_1)); server->on("/mesh", HTTP_POST, std::bind(&MeshManPlugin::sendMsg, this, std::placeholders::_1));
server->on("/mesh/nodeId", HTTP_GET, std::bind(&MeshManPlugin::getNodeId, this, std::placeholders::_1)); server->on("/mesh/nodeId", HTTP_GET, std::bind(&MeshManPlugin::getNodeId, this, std::placeholders::_1));
server->on("/mesh/broadcast", HTTP_POST, std::bind(&MeshManPlugin::broadcast, this, std::placeholders::_1)); server->on("/mesh/broadcast", HTTP_POST, std::bind(&MeshManPlugin::broadcast, this, std::placeholders::_1));
subscribe("mesh/heartbeat", std::bind(&MeshManPlugin::gotHeartbeat, this, std::placeholders::_1));
}
void gotHeartbeat(String msg){
Serial.println(String("Got heartbeat: ") + msg);
} }
void getMeshConnections(AsyncWebServerRequest *request) { void getMeshConnections(AsyncWebServerRequest *request) {
request->send(200, "text/plain", net->mesh.subConnectionJson()); request->send(200, "text/plain", net->mesh.subConnectionJson());