diff options
author | Kevron Rees <kevron_m_rees@linux.intel.com> | 2013-03-15 14:23:02 -0700 |
---|---|---|
committer | Kevron Rees <kevron_m_rees@linux.intel.com> | 2013-03-15 14:23:02 -0700 |
commit | d9fbbbbbba356d27217698a9cf14942e2bd8289c (patch) | |
tree | daa8ea293eb5382370bcfe9bc306e0fcd200495c | |
parent | b8f89166033a0f942ab92815934da1b5d4b01b1f (diff) | |
parent | b34f6af9195b344a5b4f8e88e2c22630e67b6fda (diff) | |
download | automotive-message-broker-d9fbbbbbba356d27217698a9cf14942e2bd8289c.tar.gz |
Merge branch 'master' of github.com:otcshare/automotive-message-broker
-rw-r--r-- | examples/databaseconfig | 3 | ||||
-rw-r--r-- | examples/opencvconfig | 15 | ||||
-rw-r--r-- | examples/storage | bin | 15360 -> 15360 bytes | |||
-rw-r--r-- | lib/abstractpropertytype.h | 12 | ||||
-rw-r--r-- | plugins/database/databasesink.cpp | 320 | ||||
-rw-r--r-- | plugins/database/databasesink.h | 56 | ||||
-rw-r--r-- | plugins/dbus/amb-qt/ambqt.h | 14 | ||||
-rw-r--r-- | plugins/obd2plugin/obd2source.cpp | 2 |
8 files changed, 351 insertions, 71 deletions
diff --git a/examples/databaseconfig b/examples/databaseconfig index 9007fe58..23cde224 100644 --- a/examples/databaseconfig +++ b/examples/databaseconfig @@ -14,6 +14,9 @@ { "name" : "Example Sink", "path" : "/usr/lib/automotive-message-broker/examplesinkplugin.so" + }, + { + "path" : "/usr/lib/automotive-message-broker/dbussinkplugin.so" } ] } diff --git a/examples/opencvconfig b/examples/opencvconfig new file mode 100644 index 00000000..fc6c2529 --- /dev/null +++ b/examples/opencvconfig @@ -0,0 +1,15 @@ +{ + "sources" : [ + { + "name" : "OpenCV Lux Plugin", + "path" : "/usr/lib/automotive-message-broker/opencvluxplugin.so" + } + ], + "sinks": [ + { + "name" : "DBusSink", + "path" : "/usr/lib/automotive-message-broker/dbussinkplugin.so" + } + ] +} + diff --git a/examples/storage b/examples/storage Binary files differindex 19a06f00..c21b03bc 100644 --- a/examples/storage +++ b/examples/storage diff --git a/lib/abstractpropertytype.h b/lib/abstractpropertytype.h index dc625e5e..2f67058a 100644 --- a/lib/abstractpropertytype.h +++ b/lib/abstractpropertytype.h @@ -334,7 +334,7 @@ class StringPropertyType: public AbstractPropertyType { public: StringPropertyType(std::string val) - :AbstractPropertyType(),mVariant(NULL) + :AbstractPropertyType() { setValue(val); } @@ -370,22 +370,14 @@ public: { //mVariant = Glib::Variant<std::string>::create(toString()); - if(mVariant) - g_variant_unref(mVariant); - - mVariant = g_variant_ref(g_variant_new_string(toString().c_str())); + return g_variant_new_string(toString().c_str()); - return mVariant; } void fromVariant(GVariant *v) { setValue(std::string(g_variant_get_string(v,NULL))); } - -private: - - GVariant* mVariant; }; template <class T> diff --git a/plugins/database/databasesink.cpp b/plugins/database/databasesink.cpp index 85c751df..5c44f54f 100644 --- a/plugins/database/databasesink.cpp +++ b/plugins/database/databasesink.cpp @@ -1,5 +1,6 @@ #include "databasesink.h" #include "abstractroutingengine.h" +#include "listplusplus.h" #include <json-glib/json-glib.h> @@ -8,52 +9,101 @@ extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, ma return new DatabaseSinkManager(routingengine, config); } -DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config) - :AbstractSource(engine,config) +void * cbFunc(gpointer data) { - databaseName = "storage"; - tablename = "data"; - tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)"; - shared = new Shared; - shared->db->init(databaseName, tablename, tablecreate); + Shared *shared = static_cast<Shared*>(data); + + if(!shared) + { + throw std::runtime_error("Could not cast shared object."); + } - auto cb = [](gpointer data) + while(1) { - Shared *shared = (Shared*)data; + DBObject* obj = shared->queue.pop(); - while(1) + if( obj->quit ) { - DBObject* obj = shared->queue.pop(); + delete obj; + break; + } - if( obj->quit ) - { - delete obj; - break; - } + DictionaryList<string> dict; - DictionaryList<string> dict; + NameValuePair<string> one("key", obj->key); + NameValuePair<string> two("value", obj->value); + NameValuePair<string> three("source", obj->source); + NameValuePair<string> four("time", boost::lexical_cast<string>(obj->time)); + NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj->sequence)); - NameValuePair<string> one("key", obj->key); - NameValuePair<string> two("value", obj->value); - NameValuePair<string> three("source", obj->source); - NameValuePair<string> four("time", boost::lexical_cast<string>(obj->time)); - NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj->sequence)); + dict.push_back(one); + dict.push_back(two); + dict.push_back(three); + dict.push_back(four); + dict.push_back(five); - dict.push_back(one); - dict.push_back(two); - dict.push_back(three); - dict.push_back(four); - dict.push_back(five); + shared->db->insert(dict); + delete obj; + } - shared->db->insert(dict); - delete obj; - } + return NULL; +} + +int getNextEvent(gpointer data) +{ + PlaybackShared* pbshared = static_cast<PlaybackShared*>(data); + + if(!pbshared) + throw std::runtime_error("failed to cast PlaybackShared object"); + + auto itr = pbshared->playbackQueue.begin(); + + if(itr == pbshared->playbackQueue.end()) + { + return 0; + } + + DBObject* obj = *itr; + + AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(obj->key,obj->value); + + if(value) + { + pbshared->routingEngine->updateProperty(obj->key, value, pbshared->uuid); + value->timestamp = obj->time; + value->sequence = obj->sequence; + } + + if(++itr != pbshared->playbackQueue.end()) + { + DBObject *o2 = *itr; + double t = o2->time - obj->time; + + if(t > 0) + g_timeout_add(t*1000, getNextEvent, pbshared); + else + g_timeout_add(t, getNextEvent, pbshared); + } + + pbshared->playbackQueue.remove(obj); + delete obj; + + return 0; +} + +DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config) + :AbstractSource(engine,config),thread(NULL),shared(NULL),playback(false),playbackShared(NULL) +{ + databaseName = "storage"; + tablename = "data"; + tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)"; - void* ret = NULL; - return ret; - }; + //startDb(); - thread = g_thread_new("dbthread", cb, shared); + if(config.find("startOnLoad")!= config.end()) + { + startDb(); + } parseConfig(); @@ -62,18 +112,27 @@ DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std:: engine->subscribeToProperty(*itr,this); } + mSupported.push_back(DatabaseFileProperty); + mSupported.push_back(DatabaseLoggingProperty); + mSupported.push_back(DatabasePlaybackProperty); + + routingEngine->setSupported(mSupported,this); + } DatabaseSink::~DatabaseSink() { - DBObject* obj = new DBObject(); - obj->quit = true; - - shared->queue.append(obj); + if(shared) + { + DBObject* obj = new DBObject(); + obj->quit = true; - g_thread_join(thread); + shared->queue.append(obj); - delete shared; + g_thread_join(thread); + g_thread_unref(thread); + delete shared; + } } @@ -84,13 +143,7 @@ void DatabaseSink::supportedChanged(PropertyList supportedProperties) PropertyList DatabaseSink::supported() { - PropertyList props; - - props.push_back(VehicleProperty::EngineSpeed); - props.push_back(VehicleProperty::VehicleSpeed); - props.push_back(DatabaseLoggingProperty); - - return props; + return mSupported; } void DatabaseSink::parseConfig() @@ -106,7 +159,10 @@ void DatabaseSink::parseConfig() JsonNode* node = json_parser_get_root(parser); if(node == nullptr) - throw std::runtime_error("Unable to get JSON root object"); + { + /// no options + return; + } JsonReader* reader = json_reader_new(node); @@ -133,8 +189,106 @@ void DatabaseSink::parseConfig() g_object_unref(parser); } +void DatabaseSink::stopDb() +{ + if(!shared) + return; + + DBObject *obj = new DBObject(); + obj->quit = true; + shared->queue.append(obj); + + g_thread_join(thread); + + delete shared; + shared = NULL; +} + +void DatabaseSink::startDb() +{ + if(playback) + { + DebugOut(0)<<"ERROR: tried to start logging during playback. Only logging or playback can be used at one time"<<endl; + return; + } + + if(shared) + { + DebugOut(0)<<"WARNING: logging already started. doing nothing."<<endl; + return; + } + + initDb(); + +// thread = g_thread_new("dbthread", cbFunc, shared); +} + +void DatabaseSink::startPlayback() +{ + if(playback) + return; + + playback = true; + + initDb(); + + /// get supported: + + vector<vector<string> > supportedStr = shared->db->select("SELECT DISTINCT key FROM "+tablename); + + for(int i=0; i < supportedStr.size(); i++) + { + if(!ListPlusPlus<VehicleProperty::Property>(&mSupported).contains(supportedStr[i][0])) + mSupported.push_back(supportedStr[i][0]); + } + + routingEngine->setSupported(supported(), this); + + /// populate playback queue: + + vector<vector<string> > results = shared->db->select("SELECT * FROM "+tablename); + + if(playbackShared) + { + delete playbackShared; + } + + playbackShared = new PlaybackShared(routingEngine,uuid()); + + for(int i=0;i<results.size();i++) + { + if(results[i].size() < 5) + { + throw std::runtime_error("column mismatch in query"); + } + + DBObject* obj = new DBObject(); + + obj->key = results[i][0]; + obj->value = results[i][1]; + obj->source = results[i][2]; + obj->time = boost::lexical_cast<double>(results[i][3]); +// obj->sequence = boost::lexical_cast<int>(results[i][4]); + + playbackShared->playbackQueue.push_back(obj); + } + + g_timeout_add(0,getNextEvent,playbackShared); +} + +void DatabaseSink::initDb() +{ + if(shared) delete shared; + + shared = new Shared; + shared->db->init(databaseName, tablename, tablecreate); +} + void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid) { + if(!shared) + return; + DBObject* obj = new DBObject; obj->key = property; obj->value = value->toString(); @@ -153,7 +307,40 @@ std::string DatabaseSink::uuid() void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply) { + reply->success = false; + + if(reply->property == DatabaseFileProperty) + { + StringPropertyType temp(databaseName); + reply->value = &temp; + + reply->success = true; + reply->completed(reply); + + return; + } + else if(reply->property == DatabaseLoggingProperty) + { + BasicPropertyType<bool> temp = shared; + + reply->value = &temp; + reply->success = true; + reply->completed(reply); + return; + } + + else if(reply->property == DatabasePlaybackProperty) + { + BasicPropertyType<bool> temp = playback; + reply->value = &temp; + reply->success = true; + reply->completed(reply); + + return; + } + + reply->completed(reply); } void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply) @@ -225,7 +412,44 @@ AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request) if(request.value->value<bool>()) { ///TODO: start or stop logging thread + startDb(); + reply->success = true; + BasicPropertyType<bool> temp(true); + routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid()); } + else + { + stopDb(); + reply->success = true; + BasicPropertyType<bool> temp(false); + routingEngine->updateProperty(DatabaseLoggingProperty,&temp,uuid()); + } + } + + else if(request.property == DatabaseFileProperty) + { + std::string fname = request.value->toString(); + + databaseName = fname; + + StringPropertyType temp(databaseName); + + routingEngine->updateProperty(DatabaseFileProperty,&temp,uuid()); + + reply->success = true; + } + else if( request.property == DatabasePlaybackProperty) + { + if(request.value->value<bool>()) + { + startPlayback(); + } + else + { + /// TODO: stop playback + } + + reply->success = true; } return reply; diff --git a/plugins/database/databasesink.h b/plugins/database/databasesink.h index c41017cd..7a2607f9 100644 --- a/plugins/database/databasesink.h +++ b/plugins/database/databasesink.h @@ -26,7 +26,11 @@ #include <glib.h> +#include <functional> + #define DatabaseLoggingProperty "DatabaseLogging" +#define DatabasePlaybackProperty "DatabasePlayback" +#define DatabaseFileProperty "DatabaseFile" template <typename T> class Queue @@ -34,25 +38,26 @@ class Queue public: Queue() { - mutex = g_mutex_new(); + g_mutex_init(&mutex); + g_cond_init(&cond); } int count() { - g_mutex_lock(mutex); + g_mutex_lock(&mutex); int ret = mQueue.count(); - g_mutex_unlock(mutex); + g_mutex_unlock(&mutex); return ret; } T pop() { - g_mutex_lock(mutex); + g_mutex_lock(&mutex); while(!mQueue.size()) { - g_cond_wait(&cond, mutex); + g_cond_wait(&cond, &mutex); } auto itr = mQueue.begin(); @@ -61,24 +66,24 @@ public: mQueue.erase(itr); - g_mutex_unlock(mutex); + g_mutex_unlock(&mutex); return item; } void append(T item) { - g_mutex_lock(mutex); + g_mutex_lock(&mutex); g_cond_signal(&cond); mQueue.push_back(item); - g_mutex_unlock(mutex); + g_mutex_unlock(&mutex); } private: - GMutex * mutex; + GMutex mutex; GCond cond; std::vector<T> mQueue; }; @@ -110,6 +115,28 @@ public: Queue<DBObject*> queue; }; +class PlaybackShared +{ +public: + PlaybackShared(AbstractRoutingEngine* re, std::string u) + :routingEngine(re),uuid(u) {} + ~PlaybackShared() + { + for(auto itr = playbackQueue.begin(); itr != playbackQueue.end(); itr++) + { + DBObject* obj = *itr; + + delete obj; + } + + playbackQueue.clear(); + } + + AbstractRoutingEngine* routingEngine; + std::list<DBObject*> playbackQueue; + std::string uuid; +}; + class DatabaseSink : public AbstractSource { @@ -127,11 +154,15 @@ public: virtual void subscribeToPropertyChanges(VehicleProperty::Property property); virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property); virtual PropertyList supported(); - int supportedOperations() { return GetRanged; } + int supportedOperations() { return GetRanged | Get | Set;} private: //methods: void parseConfig(); + void stopDb(); + void startDb(); + void startPlayback(); + void initDb(); private: PropertyList mSubscriptions; @@ -141,6 +172,9 @@ private: std::string tablename; std::string tablecreate; std::list<VehicleProperty::Property> propertiesToSubscribeTo; + PropertyList mSupported; + bool playback; + PlaybackShared* playbackShared; }; class DatabaseSinkManager: public AbstractSinkManager @@ -151,6 +185,8 @@ public: { new DatabaseSink(routingEngine, config); VehicleProperty::registerProperty(DatabaseLoggingProperty, [](){return new BasicPropertyType<bool>(false);}); + VehicleProperty::registerProperty(DatabasePlaybackProperty, [](){return new BasicPropertyType<bool>(false);}); +VehicleProperty::registerProperty(DatabaseFileProperty, [](){return new StringPropertyType("out.ogg");}); } }; diff --git a/plugins/dbus/amb-qt/ambqt.h b/plugins/dbus/amb-qt/ambqt.h index ade3c05a..c406d737 100644 --- a/plugins/dbus/amb-qt/ambqt.h +++ b/plugins/dbus/amb-qt/ambqt.h @@ -19,7 +19,7 @@ class AmbProperty: public QObject Q_OBJECT Q_PROPERTY(QString propertyName READ propertyName WRITE setPropertyName) AUTOPROPERTY(QString, propertyName, PropertyName) - Q_PROPERTY(QVariant value READ value NOTIFY valueChanged) + Q_PROPERTY(QVariant value READ value WRITE setValue NOTIFY valueChanged) Q_PROPERTY(QString interfaceName READ interfaceName WRITE setInterfaceName) AUTOPROPERTY(QString, interfaceName, InterfaceName) Q_PROPERTY(QString objectPath READ objectPath WRITE setObjectPath) @@ -35,7 +35,7 @@ class AmbProperty: public QObject { if(!mDBusInterface || !mDBusInterface->isValid()) { - qDebug()<<"error Interface is not valid"; + qDebug()<<"error Interface is not valid: "<<interfaceName(); return QVariant::Invalid; } @@ -44,6 +44,16 @@ class AmbProperty: public QObject return value; } + void setValue(QVariant v) + { + if(!mDBusInterface || !mDBusInterface->isValid()) + { + qDebug()<<"error Interface is not valid "<<interfaceName(); + } + + mDBusInterface->setProperty(propertyName().toAscii(), v); + } + Q_SIGNALS: void propertyChanged(QVariant, double); void valueChanged(QVariant); diff --git a/plugins/obd2plugin/obd2source.cpp b/plugins/obd2plugin/obd2source.cpp index 66ea0011..582c944f 100644 --- a/plugins/obd2plugin/obd2source.cpp +++ b/plugins/obd2plugin/obd2source.cpp @@ -76,7 +76,7 @@ std::string reply; { //No reply found //printf("Error!\n"); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error resetting ELM\n"; + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error resetting ELM"<<endl; } else { |