diff options
author | Kevron Rees <kevron_m_rees@linux.intel.com> | 2012-12-03 12:16:00 -0800 |
---|---|---|
committer | Kevron Rees <kevron_m_rees@linux.intel.com> | 2012-12-03 12:16:00 -0800 |
commit | 664a325e88ba918b81e943fce41c01018f6989a5 (patch) | |
tree | 7fd15764325519256dacde83f2b4175738f9983e | |
parent | f8cee279ce4c0ec68b15733d85411f7b61dad7b5 (diff) | |
parent | db05580799dfd7260028a6605573c85b4dec89bd (diff) | |
download | automotive-message-broker-664a325e88ba918b81e943fce41c01018f6989a5.tar.gz |
55 files changed, 2785 insertions, 329 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index cfc41e73..5dd7de79 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_BUILD_TYPE, Debug) include(FindPkgConfig) set(PROJECT_NAME "automotive-message-broker") -set(PROJECT_VERSION "0.5.0") +set(PROJECT_VERSION "0.6.0") add_definitions(-DPROJECT_VERSION="${PROJECT_VERSION}") add_definitions(-DPROJECT_NAME="${PROJECT_NAME}") @@ -19,6 +19,10 @@ set (DOC_INSTALL_DIR "${CMAKE_INSTALL_PREFIX}/share/doc/${PROJECT_NAME}" CACHE P option(use_qtcore "Use QCoreApplication mainloop " OFF) option(tpms_plugin "TPMS plugin " OFF) option(obd2_plugin "OBD-II plugin" ON) +<<<<<<< HEAD +======= +option(database_plugin "Database plugins" OFF) +>>>>>>> release find_library(libtool_LIBRARY ltdl DOC "Libtool libraries") find_path(libtool_INCLUDE_DIR ltdl.h DOC "Libtool headers") @@ -1,6 +1,5 @@ -- test for memory leaks in websocket sink plugin -- implement websocket source getProperty methods and setProperty - per-source property filtering in routing engine +<<<<<<< HEAD - historic data "get" interface in routing engine - sqlite database storage plugin - connect/disconnect on first/last subscription in obd2source @@ -10,3 +9,6 @@ - fix issue regarding obd2 thread loop not exiting sometimes - optimize CPU performance in obd2source - obd2source blacklist unsupported: persistent blacklist mapped to VIN? +======= +- websocket source may want to implement a timeout on Get/Set/GetRanged calls and reply with success=false +>>>>>>> release diff --git a/ambd/core.cpp b/ambd/core.cpp index 78e4a84e..82a69cee 100644 --- a/ambd/core.cpp +++ b/ambd/core.cpp @@ -140,7 +140,7 @@ void Core::updateSupported(PropertyList added, PropertyList removed) } } -void Core::updateProperty(VehicleProperty::Property property, AbstractPropertyType *value) +void Core::updateProperty(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid) { SinkList list = propertySinkMap[property]; @@ -148,9 +148,10 @@ void Core::updateProperty(VehicleProperty::Property property, AbstractPropertyTy propertiesPerSecond++; + for(SinkList::iterator itr = list.begin(); itr != list.end(); itr++) { - (*itr)->propertyChanged(property, value,(*itr)->uuid()); + (*itr)->propertyChanged(property, value, uuid); } } @@ -179,7 +180,11 @@ AsyncPropertyReply *Core::getPropertyAsync(AsyncPropertyRequest request) { AbstractSource* src = (*itr); PropertyList properties = src->supported(); - if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property)) + int supportedOps = src->supportedOperations(); + + bool supportsGet = supportedOps & AbstractSource::Get; + + if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && supportsGet) { src->getPropertyAsync(reply); } @@ -196,7 +201,7 @@ AsyncRangePropertyReply *Core::getRangePropertyAsync(AsyncRangePropertyRequest r { AbstractSource* src = (*itr); PropertyList properties = src->supported(); - if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property)) + if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::GetRanged) { src->getRangePropertyAsync(reply); } @@ -211,19 +216,22 @@ AsyncPropertyReply * Core::setProperty(AsyncSetPropertyRequest request) { AbstractSource* src = (*itr); PropertyList properties = src->supported(); - if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property)) + if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::Set) { return src->setProperty(request); } } + + DebugOut(0)<<"Error: setProperty opration failed"<<endl; + return NULL; } void Core::subscribeToProperty(VehicleProperty::Property property, AbstractSink* self) { - printf("Subscribing\n"); + DebugOut(1)<<"Subscribing to: "<<property<<endl; if(!ListPlusPlus<VehicleProperty::Property>(&mMasterPropertyList).contains((property))) { - DebugOut()<<__FUNCTION__<<"(): property not supported: "<<property<<endl; + DebugOut(1)<<__FUNCTION__<<"(): property not supported: "<<property<<endl; return; } @@ -254,7 +262,7 @@ void Core::unsubscribeToProperty(VehicleProperty::Property property, AbstractSin { if(propertySinkMap.find(property) == propertySinkMap.end()) { - DebugOut()<<__FUNCTION__<<"property not supported: "<<property; + DebugOut(1)<<__FUNCTION__<<"property not supported: "<<property; return; } diff --git a/ambd/core.h b/ambd/core.h index 61f3e906..7ad4ad05 100644 --- a/ambd/core.h +++ b/ambd/core.h @@ -36,7 +36,7 @@ public: void setSupported(PropertyList supported, AbstractSource* source); void updateSupported(PropertyList added, PropertyList removed); - void updateProperty(VehicleProperty::Property property, AbstractPropertyType* value); + void updateProperty(VehicleProperty::Property property, AbstractPropertyType* value, string uuid); /// sinks: @@ -61,6 +61,8 @@ private: int propertiesPerSecond; std::map<VehicleProperty::Property, SinkList> propertySinkMap; + + std::map<VehicleProperty::Property, std::string> previousValueMap; }; diff --git a/ambd/main.cpp b/ambd/main.cpp index b16edfca..2579c0fa 100644 --- a/ambd/main.cpp +++ b/ambd/main.cpp @@ -72,7 +72,7 @@ void daemonize(); void printhelp(const char *argv0); -static const char shortopts[] = "hvDc:d:"; +static const char shortopts[] = "hvDc:d:l:"; static const struct option longopts[] = { { "help", no_argument, NULL, 'h' }, ///< Print the help text @@ -80,6 +80,7 @@ static const struct option longopts[] = { { "daemonise", no_argument, NULL, 'D' }, ///< Daemonise { "config", required_argument, NULL, 'c' }, { "debug", required_argument, NULL, 'd' }, + { "log", required_argument, NULL, 'l' }, { NULL, 0, NULL, 0 } ///< End }; @@ -90,6 +91,8 @@ int main(int argc, char **argv) int optc; int th = 0; string config="/etc/ambd/config"; + ofstream logfile; + string logfn; while ((optc = getopt_long (argc, argv, shortopts, longopts, NULL)) != -1) { @@ -112,6 +115,9 @@ int main(int argc, char **argv) th = atoi(optarg); DebugOut::setDebugThreshhold(th); break; + case 'l': + logfn = optarg; + break; default: cerr<<"Unknown option "<<optc<<endl; printhelp(argv[0]); @@ -123,6 +129,12 @@ int main(int argc, char **argv) if(isdeamonize) daemonize(); + if(!logfn.empty()) + { + logfile.open(logfn, ios::out | ios::trunc); + DebugOut::setOutput(logfile); + } + #ifdef USE_QT_CORE @@ -159,6 +171,9 @@ int main(int argc, char **argv) #endif + if(logfile.is_open()) + logfile.close(); + return 0; } diff --git a/ambd/pluginloader.cpp b/ambd/pluginloader.cpp index 33107de1..55ae6b7c 100644 --- a/ambd/pluginloader.cpp +++ b/ambd/pluginloader.cpp @@ -153,6 +153,19 @@ PluginLoader::PluginLoader(string configFile, AbstractRoutingEngine* re): f_crea } +PluginLoader::~PluginLoader() +{ + for(auto itr = mSinks.begin(); itr != mSinks.end(); itr++) + { + delete *itr; + } + + for(auto itr = mSources.begin(); itr != mSources.end(); itr++) + { + delete *itr; + } +} + SinkList PluginLoader::sinks() { return mSinks; diff --git a/ambd/pluginloader.h b/ambd/pluginloader.h index 18edb9e4..d0489c84 100644 --- a/ambd/pluginloader.h +++ b/ambd/pluginloader.h @@ -41,6 +41,7 @@ class PluginLoader public: PluginLoader(string configFile, AbstractRoutingEngine* routingEngine); + ~PluginLoader(); SourceList sources(); SinkList sinks(); diff --git a/examples/databaseconfig b/examples/databaseconfig new file mode 100644 index 00000000..0615022b --- /dev/null +++ b/examples/databaseconfig @@ -0,0 +1,19 @@ +{ + "sources" : [ + { + "name" : "ExampleSouce", + "path" : "/usr/lib/automotive-message-broker/examplesourceplugin.so" + } + ], + "sinks": [ + { + "name" : "Database", + "path" : "/usr/lib/automotive-message-broker/databasesinkplugin.so" + }, + { + "name" : "Example Sink", + "path" : "/usr/lib/automotive-message-broker/examplesinkplugin.so" + } + ] +} + diff --git a/examples/obdsourceconfig b/examples/obdsourceconfig index f7403bd1..a76cdc61 100644 --- a/examples/obdsourceconfig +++ b/examples/obdsourceconfig @@ -3,7 +3,7 @@ { "name" : "OBD2Source", "path" : "/usr/lib/automotive-message-broker/obd2sourceplugin.so", - "device" : "/dev/ttyUSB0", + "device" : "/dev/pts/5", "baud" : "115200", "bluetoothAdapter" : "" } diff --git a/examples/testmultiisource b/examples/testmultiisource new file mode 100755 index 00000000..525ea39c --- /dev/null +++ b/examples/testmultiisource @@ -0,0 +1,20 @@ +#!/bin/bash + + +numclients=$2 +config=$1 + +#start initial ambd client: + +ambd -D -c $config -d5 -l host.output + +sleep 10 + +for (( i=1; i<=$numclients; i++ )) +do + ambd -D -c configwebsocketsource -d2 -l client.$i.output +done + +sleep 60 + +killall ambd diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 25ac7477..b8fe9151 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -1,5 +1,5 @@ -set(amb_sources abstractpropertytype.cpp abstractroutingengine.cpp listplusplus.cpp abstractsink.cpp vehicleproperty.cpp abstractsource.cpp debugout.cpp) -set(amb_headers_install abstractpropertytype.h nullptr.h abstractroutingengine.h listplusplus.h abstractsink.h vehicleproperty.h debugout.h abstractsource.h) +set(amb_sources abstractpropertytype.cpp abstractroutingengine.cpp listplusplus.cpp abstractsink.cpp vehicleproperty.cpp abstractsource.cpp debugout.cpp timestamp.cpp) +set(amb_headers_install abstractpropertytype.h nullptr.h abstractroutingengine.h listplusplus.h abstractsink.h vehicleproperty.h debugout.h abstractsource.h timestamp.h) include_directories( ${include_dirs} ) add_library(amb SHARED ${amb_sources}) diff --git a/lib/abstractpropertytype.h b/lib/abstractpropertytype.h index 777cd3ba..87272e4b 100644 --- a/lib/abstractpropertytype.h +++ b/lib/abstractpropertytype.h @@ -26,17 +26,27 @@ #include <boost/lexical_cast.hpp> #include <boost/utility.hpp> #include <type_traits> +#include "timestamp.h" class AbstractPropertyType { public: - virtual std::string toString() = 0; + AbstractPropertyType(): timestamp(0), sequence(0) {} + + virtual std::string toString() const = 0; virtual void fromString(std::string)= 0; + virtual AbstractPropertyType* copy() = 0; + + double timestamp; + + uint32_t sequence; + void setValue(boost::any val) { mValue = val; + timestamp = amb::currentTime(); } template <typename T> @@ -86,6 +96,11 @@ public: else throw std::runtime_error("value cannot be empty"); } + AbstractPropertyType* copy() + { + return new BasicPropertyType<T>(*this); + } + void fromString(std::string val) { if(!val.empty() && val != "") @@ -94,7 +109,7 @@ public: } } - std::string toString() + std::string toString() const { std::stringstream stream; stream<<value<T>(); @@ -150,8 +165,12 @@ public: setValue(val); } + AbstractPropertyType* copy() + { + return new StringPropertyType(*this); + } - std::string toString() + std::string toString() const { return value<std::string>(); } diff --git a/lib/abstractroutingengine.h b/lib/abstractroutingengine.h index cca57b15..645f7f18 100644 --- a/lib/abstractroutingengine.h +++ b/lib/abstractroutingengine.h @@ -38,17 +38,6 @@ class AsyncRangePropertyReply; typedef std::function<void (AsyncPropertyReply*)> GetPropertyCompletedSignal; typedef std::function<void (AsyncRangePropertyReply*)> GetRangedPropertyCompletedSignal; -class PropertyValueTime { -public: - ~PropertyValueTime() - { - delete value; - } - - AbstractPropertyType* value; - double timestamp; -}; - class AsyncPropertyRequest { public: @@ -111,7 +100,7 @@ class AsyncRangePropertyRequest { public: AsyncRangePropertyRequest() - :begin(0), end(0) + :timeBegin(0), timeEnd(0), sequenceBegin(-1), sequenceEnd(-1) { } @@ -121,14 +110,18 @@ public: { this->property = request.property; this->completed = request.completed; - this->begin = request.begin; - this->end = request.end; + this->timeBegin = request.timeBegin; + this->timeEnd = request.timeEnd; + this->sequenceBegin = request.sequenceBegin; + this->sequenceEnd = request.sequenceEnd; } VehicleProperty::Property property; GetRangedPropertyCompletedSignal completed; - double begin; - double end; + double timeBegin; + double timeEnd; + int32_t sequenceBegin; + int32_t sequenceEnd; }; class AsyncRangePropertyReply: public AsyncRangePropertyRequest @@ -150,7 +143,7 @@ public: values.clear(); } - std::list<PropertyValueTime*> values; + std::list<AbstractPropertyType*> values; bool success; }; @@ -159,7 +152,7 @@ class AbstractRoutingEngine public: virtual void setSupported(PropertyList supported, AbstractSource* source) = 0; virtual void updateSupported(PropertyList added, PropertyList removed) = 0; - virtual void updateProperty(VehicleProperty::Property property, AbstractPropertyType* value) = 0; + virtual void updateProperty(VehicleProperty::Property property, AbstractPropertyType* value, std::string uuid) = 0; /// sinks: virtual void registerSink(AbstractSink* self) = 0; @@ -170,6 +163,7 @@ public: virtual void subscribeToProperty(VehicleProperty::Property, AbstractSink* self) = 0; virtual void unsubscribeToProperty(VehicleProperty::Property, AbstractSink* self) = 0; virtual PropertyList supported() = 0; + }; #endif // ABSTRACTROUTINGENGINE_H diff --git a/lib/abstractsource.h b/lib/abstractsource.h index d4c1e523..10ff01b1 100644 --- a/lib/abstractsource.h +++ b/lib/abstractsource.h @@ -35,10 +35,18 @@ class AbstractSource; typedef list<AbstractSource*> SourceList; + + class AbstractSource: public AbstractSink { public: + enum Operations { + Get = 0x01, + Set = 0x02, + GetRanged = 0x04 + }; + AbstractSource(AbstractRoutingEngine* engine, map<string, string> config); virtual ~AbstractSource(); @@ -50,6 +58,8 @@ public: virtual void subscribeToPropertyChanges(VehicleProperty::Property property) = 0; virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property) = 0; virtual PropertyList supported() = 0; + + virtual int supportedOperations() = 0; protected: diff --git a/lib/debugout.cpp b/lib/debugout.cpp index 76354900..f7669714 100644 --- a/lib/debugout.cpp +++ b/lib/debugout.cpp @@ -21,7 +21,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA using namespace std; int DebugOut::debugThreshhold = 0; -ostream & DebugOut::out = cout; +std::streambuf * DebugOut::buf = cout.rdbuf(); void debugOut(string message) { diff --git a/lib/debugout.h b/lib/debugout.h index 7985f05c..40cbb9b9 100644 --- a/lib/debugout.h +++ b/lib/debugout.h @@ -34,6 +34,10 @@ public: DebugOut const& operator << (string message) const { + ostream out(buf); + + out.precision(15); + if(mDebugLevel <= debugThreshhold) out<<message<<" "; return *this; @@ -41,13 +45,30 @@ public: DebugOut const& operator << (ostream & (*manip)(std::ostream&)) const { + ostream out(buf); + + out.precision(15); + if(mDebugLevel <= debugThreshhold) out<<endl; return *this; } - DebugOut const & operator << (uint16_t val) const + /*DebugOut const & operator << (uint16_t val) const { + ostream out(buf); + + if(mDebugLevel <= debugThreshhold) + out<<val<<" "; + return *this; + }*/ + + DebugOut const & operator << (double val) const + { + ostream out(buf); + + out.precision(5); + if(mDebugLevel <= debugThreshhold) out<<val<<" "; return *this; @@ -58,9 +79,14 @@ public: debugThreshhold = th; } + static void setOutput(ostream &o) + { + buf = o.rdbuf(); + } + private: static int debugThreshhold; - static ostream &out; + static std::streambuf *buf; int mDebugLevel; }; diff --git a/lib/timestamp.cpp b/lib/timestamp.cpp new file mode 100644 index 00000000..6756803c --- /dev/null +++ b/lib/timestamp.cpp @@ -0,0 +1,18 @@ +#include "timestamp.h" + +#include <time.h> +#include <iostream> + +double amb::currentTime() +{ + struct timespec tm; + + clock_gettime(CLOCK_REALTIME, &tm); + + double ns = double(tm.tv_nsec) / 1000000000; + + + double time = double(tm.tv_sec) + ns; + + return time; +} diff --git a/lib/timestamp.h b/lib/timestamp.h new file mode 100644 index 00000000..745bc13b --- /dev/null +++ b/lib/timestamp.h @@ -0,0 +1,11 @@ +#ifndef _TIMESTAMP_H___ +#define _TIMESTAMP_H___ + + +namespace amb { + +double currentTime(); + +} + +#endif diff --git a/lib/vehicleproperty.cpp b/lib/vehicleproperty.cpp index 220186fc..ab3b1c76 100644 --- a/lib/vehicleproperty.cpp +++ b/lib/vehicleproperty.cpp @@ -18,8 +18,12 @@ #include "vehicleproperty.h" +#include "listplusplus.h" +#include "debugout.h" + #include <map> + #define REGISTERPROPERTY(property, defaultValue) \ registerProperty(property, []() { return new property ## Type(defaultValue); }); @@ -113,6 +117,7 @@ VehicleProperty::VehicleProperty() registerProperty(AccelerationZ, [](){ return new AccelerationType(0); }); registerProperty(MassAirFlow, [](){ return new MassAirFlowType(0); }); registerProperty(ButtonEvent, [](){ return new ButtonEventType(ButtonEvents::NoButton); }); + REGISTERPROPERTY(AirIntakeTemperature,0) registerProperty(BatteryVoltage, [](){ return new BatteryVoltageType(0); }); registerProperty(InteriorTemperature, [](){ return new InteriorTemperatureType(0); }); registerProperty(VIN, [](){ return new VINType(""); }); @@ -178,6 +183,12 @@ AbstractPropertyType* VehicleProperty::getPropertyTypeForPropertyNameValue(Vehic void VehicleProperty::registerProperty(VehicleProperty::Property name, VehicleProperty::PropertyTypeFactoryCallback factory) { + if(ListPlusPlus<Property>(&mCapabilities).contains(name)) + { + DebugOut(0)<<__FUNCTION__<<" ERROR: property '"<<name<<"'' already registered."<<endl; + return; + } + registeredPropertyFactoryMap[name] = factory; mCapabilities.push_back(name); } diff --git a/packaging/automotive-message-broker.changes b/packaging/automotive-message-broker.changes index eae2a211..dbcec3f1 100644 --- a/packaging/automotive-message-broker.changes +++ b/packaging/automotive-message-broker.changes @@ -1,3 +1,9 @@ +* Mon Dec 03 2012 tripzero <kevron_m_rees@linux.intel.com> submit/release/20121112.191808@cbfefb5 +- version bump in spec +- version bump 0.6.0 +- updated changes +- todo updated + * Fri Nov 02 2012 tripzero <kevron_m_rees@linux.intel.com> 1.0_branch@7de9474 - new version - Manual merge diff --git a/packaging/automotive-message-broker.spec b/packaging/automotive-message-broker.spec index f893091f..e145324b 100644 --- a/packaging/automotive-message-broker.spec +++ b/packaging/automotive-message-broker.spec @@ -1,6 +1,6 @@ Name: automotive-message-broker Summary: Automotive Message Broker is a vehicle network abstraction system. -Version: 0.5.0 +Version: 0.6.0 Release: 1 Group: System Environment/Daemons License: LGPL v2.1 diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index b3d70b85..d620cd52 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -26,3 +26,4 @@ add_subdirectory(obd2plugin) add_subdirectory(demosink) add_subdirectory(websocketsourceplugin) add_subdirectory(tpms) +add_subdirectory(database) diff --git a/plugins/database/CMakeLists.txt b/plugins/database/CMakeLists.txt new file mode 100644 index 00000000..305fc450 --- /dev/null +++ b/plugins/database/CMakeLists.txt @@ -0,0 +1,17 @@ +if(database_plugin) + +include(CheckIncludeFiles) + +pkg_check_modules(sqlite REQUIRED sqlite3) + +include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs} ${sqlite_INCLUDE_DIRS}) + +set(databasesinkplugin_headers databasesink.h utils.h basedb.hpp baseobject.h sqlitedatabase.h sqlitequery.h) +set(databasesinkplugin_sources databasesink.cpp utils.cpp sqlitedatabase.cpp sqlitequery.cpp) +add_library(databasesinkplugin MODULE ${databasesinkplugin_sources}) +set_target_properties(databasesinkplugin PROPERTIES PREFIX "") +target_link_libraries(databasesinkplugin amb -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${sqlite_LIBRARIES}) + +install(TARGETS databasesinkplugin LIBRARY DESTINATION lib/automotive-message-broker) + +endif(database_plugin) diff --git a/plugins/database/basedb.hpp b/plugins/database/basedb.hpp new file mode 100644 index 00000000..042b413f --- /dev/null +++ b/plugins/database/basedb.hpp @@ -0,0 +1,341 @@ +/* + * timedate - Displays time and date and daily events + * Copyright (c) <2009>, Intel Corporation. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms and conditions of the GNU Lesser General Public License, + * version 2.1, as published by the Free Software Foundation. + * + * This program is distributed in the hope it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef _BASEDB_H_ +#define _BASEDB_H_ + +#include "sqlitedatabase.h" +#include "sqlitequery.h" +#include "debugout.h" +#include <string> +#include <sstream> +#include <stdio.h> +#include <stdlib.h> +#include <vector> + +using namespace std; + +template<typename T> +class NameValuePair +{ + public: + NameValuePair(){ } + NameValuePair(std::string n, T v){name = n; value = v;} + std::string name; + T value; +}; + +template<typename T> +class DictionaryList: public std::vector<NameValuePair<T> > +{ + +}; + +class BaseDB +{ +public: + BaseDB():db(NULL),q(NULL) + { + + } + + + virtual ~BaseDB() + { + DebugOut()<<"BaseDB: Destroying db object. Table: "<<table<<endl; + delete q; + delete db; + } + + void setTable(string tablename) + { + if(tablename == "") return; + table = tablename; + + if(!tableExists()) + reloadTable(); + } + + virtual void + init(string dbname, string tablename, string tablestring) + { + DebugOut()<<"BaseDB: Initializing db object. Table: "<<tablename.c_str()<<endl; + tableString = tablestring; + + db = new sqlitedatabase(); + + db->init(dbname); + + DebugOut()<<"BaseDB: Using db/db-file: "<<dbname.c_str()<<endl; + + if(! db->Connected()) + { + DebugOut(0)<<"BaseDB: database not found "<<dbname<<endl; + throw -1; + } + q = new sqlitequery(); + + q->init(db); + + setTable(tablename); + } + + virtual void + reloadTable() + { + DebugOut()<<"BaseDB: reloading table "<<table<<endl; + dropTable(); + createTable(); + } + + virtual bool tableExists() + { + bool exists=false; + string query = "SELECT * FROM "+table+" LIMIT 0,1"; + DebugOut()<<"BaseDB: checking for existing table with "<<query.c_str()<<endl; + q->getResult(query); + int numrows = q->numRows(); + if(numrows <= 0 ) + exists = false; + else exists = true; + + DebugOut()<<"BaseDB: Table '"<<table<<"' exists? "<<exists<<" because "<<numrows<<" rows where found."<<endl; + q->freeResult(); + return exists; + } + + virtual void + renameTable(string newname) + { + dropTable(newname); + string query = "ALTER TABLE "+table+" RENAME TO "+newname; + q->execute(query); + } + + template<typename T> + void insert(DictionaryList<T> params) + { + string query = "INSERT INTO "+table+" ("; + ostringstream endquery; + endquery<<" VALUES ( "; + for(size_t i=0; i< params.size(); i++) + { + query+=" `"+fixInvalids(params[i].name)+"`"; + ostringstream tempval; + tempval<<params[i].value; + endquery<<"'"<<fixInvalids(tempval.str())<<"'"; + if(i < params.size()-1) + { + query+=","; + endquery<<","; + } + } + endquery<<" )"; + query+=" )"+endquery.str(); + DebugOut()<<"BaseDB: "<<query<<endl; + q->execute(query); + } + + template<typename T> + void + insert(NameValuePair<T> param) + { + string query = "INSERT INTO "+table+" ("; + ostringstream endquery; + endquery<<" VALUES ( "; + query+=" `"+fixInvalids(param.name)+"`"; + ostringstream tempval; + tempval<<param.value; + endquery<<"'"<<fixInvalids(tempval.str())<<"'"; + endquery<<" )"; + query+=" )"+endquery.str(); + DebugOut()<<"BaseDB: "<<query<<endl; + q->execute(query); + } + + virtual void + insert(DictionaryList<string> params) + { + insert<string>(params); + } + + template<typename T, typename TT, typename T3> + void + update(T col, TT colval, NameValuePair<T3> qualifier) + { + ostringstream query; + ostringstream tempval; + ostringstream tempcolval; + tempval<<qualifier.value; + tempcolval<<colval; + query << "UPDATE "<< table << + " SET `"<<col<<"` = '"<<fixInvalids(tempcolval.str())<< + "' WHERE `"<<fixInvalids(qualifier.name)<<"` = '"<<fixInvalids(tempval.str())<<"'"; + printf("BaseDB: Update: %s",query.str().c_str()); + q->execute(query.str()); + } + + template<typename T, typename TT> + void + update(NameValuePair<T> param, NameValuePair<TT> qualifier) + { + update<string,T,TT>(param.name, param.value, qualifier); + } + + template<typename T, typename TT> + void + update(DictionaryList<T> params, NameValuePair<TT> qualifier) + { + for(size_t i=0;i<params.size();i++) + { + update<T,TT>(params[i],qualifier); + } + } + + virtual void update(NameValuePair<string> param, NameValuePair<string> qualifier) + { + update<string,string>(param,qualifier); + } + + template<typename T> + void deleteRow(NameValuePair<T> qualifier) + { + ostringstream query; + ostringstream tempval; + tempval<<qualifier.value; + query << "DELETE FROM "<< table<< + " WHERE `"<<qualifier.name<<"` = '"<<fixInvalids(tempval.str())<<"'"; + printf("BaseDB: %s: %s",__FUNCTION__, query.str().c_str()); + q->execute(query.str()); + } + + virtual void + deleteRow(NameValuePair<string> qualifier) + { + deleteRow<string>(qualifier); + } + + virtual void + dropTable() + { + dropTable(table); + } + + virtual void + dropTable(string tablename) + { + string query="DROP TABLE IF EXISTS "+tablename; + printf("BaseDB: Dropping Table %s with query:? %s",tablename.c_str(),query.c_str()); + q->execute(query); + } + + virtual void + createTable() + { + string t = tableString; + string query; + string::size_type i=t.find("%s",0); + if(i!=string::npos) query=t.replace(i, 2, table); + else query = t; + printf("BaseDB: Creating Table %s with query:? %s",table.c_str(),query.c_str()); + q->execute(query); + } + + + + string + fixInvalids(string filename) + { + return filename; + } + + vector<vector<string> > select(string query) + { + DebugOut()<<query<<endl; + + vector<vector<string>> dataMap; + + q->getResult(query); + + if(q->numRows() <= 0) + { + q->freeResult(); + return dataMap; + } + + int i=0; + + while(q->fetchRow()) + { + string v; + dataMap.push_back(vector<string>()); + + while((v = q->getStr()) != "") + { + dataMap[i].push_back(v); + } + i++; + } + + q->freeResult(); + + return dataMap; + + } + +protected: + + void + fixFilename(string* filename) + { + std::string::size_type i=0; + while(1) + { + i = filename->find(" ",i); + if(i == string::npos) + break; + filename->replace(i,1,"\\ "); + i+=2; + } + } + + void + unfixFilename(string* filename) + { + std::string::size_type i=0; + i=filename->find("\\",0); + if(i == string::npos) + return; + else + { + filename->replace(i,1,""); + unfixFilename(filename); + } + + } + + sqlitedatabase *db; + sqlitequery *q; + string table; + string tableString; + +}; //BaseDB class + +#endif + diff --git a/plugins/database/baseobject.h b/plugins/database/baseobject.h new file mode 100644 index 00000000..a36ddd46 --- /dev/null +++ b/plugins/database/baseobject.h @@ -0,0 +1,37 @@ +/*
+ * timedate - Displays time and date and daily events
+ * Copyright (c) <2009>, Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU Lesser General Public License,
+ * version 2.1, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ */
+
+#ifndef _BASEOBJECTDAO_H_
+#define _BASEOBJECTDAO_H_
+
+#include "basedb.hpp"
+
+#include<string>
+#include<vector>
+
+class BaseObject
+{
+public:
+ BaseObject():id(-1){}
+ virtual ~BaseObject(){}
+ int id;
+};
+
+#endif
+
diff --git a/plugins/database/databasesink.cpp b/plugins/database/databasesink.cpp new file mode 100644 index 00000000..eb4a0a5d --- /dev/null +++ b/plugins/database/databasesink.cpp @@ -0,0 +1,190 @@ +#include "databasesink.h" +#include "abstractroutingengine.h" + +extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config) +{ + return new DatabaseSinkManager(routingengine, config); +} + +DatabaseSink::DatabaseSink(AbstractRoutingEngine *engine, map<std::string, std::string> config) + :AbstractSource(engine,config) +{ + databaseName = "storage"; + tablename = "data"; + tablecreate = "CREATE TABLE IF NOT EXISTS data (key TEXT, value BLOB, source TEXT, time REAL, sequence REAL)"; + shared = new Shared; + shared->db->init(databaseName, tablename, tablecreate); + + engine->subscribeToProperty(VehicleProperty::EngineSpeed, this); + engine->subscribeToProperty(VehicleProperty::VehicleSpeed, this); + + PropertyList props; + props.push_back(VehicleProperty::EngineSpeed); + props.push_back(VehicleProperty::VehicleSpeed); + + engine->setSupported(supported(),this); + + auto cb = [](gpointer data) + { + Shared *shared = (Shared*)data; + + while(1) + { + DBObject* obj = shared->queue.pop(); + + if( obj->quit ) + { + break; + } + + DictionaryList<string> dict; + + NameValuePair<string> one("key", obj->key); + NameValuePair<string> two("value", obj->value); + NameValuePair<string> three("source", obj->source); + NameValuePair<string> four("time", boost::lexical_cast<string>(obj->time)); + NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj->sequence)); + + dict.push_back(one); + dict.push_back(two); + dict.push_back(three); + dict.push_back(four); + dict.push_back(five); + + shared->db->insert(dict); + delete obj; + } + + void* ret = NULL; + return ret; + }; + + thread = g_thread_new("dbthread", cb, shared); + +} + +DatabaseSink::~DatabaseSink() +{ + DBObject* obj = new DBObject(); + obj->quit = true; + + shared->queue.append(obj); + + g_thread_join(thread); + + delete shared; +} + + +void DatabaseSink::supportedChanged(PropertyList supportedProperties) +{ + +} + +PropertyList DatabaseSink::supported() +{ + PropertyList props; + + props.push_back(VehicleProperty::EngineSpeed); + props.push_back(VehicleProperty::VehicleSpeed); + + return props; +} + +void DatabaseSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType *value, std::string uuid) +{ + DBObject* obj = new DBObject; + obj->key = property; + obj->value = value->toString(); + obj->source = uuid; + obj->time = value->timestamp; + obj->sequence = value->sequence; + + shared->queue.append(obj); +} + + +std::string DatabaseSink::uuid() +{ + return "9f88156e-cb92-4472-8775-9c08addf50d3"; +} + +void DatabaseSink::getPropertyAsync(AsyncPropertyReply *reply) +{ + +} + +void DatabaseSink::getRangePropertyAsync(AsyncRangePropertyReply *reply) +{ + BaseDB * db = new BaseDB(); + db->init(databaseName, tablename, tablecreate); + + ostringstream query; + query.precision(15); + + query<<"SELECT * from "<<tablename<<" WHERE "; + + if(reply->timeBegin && reply->timeEnd) + { + query<<" time BETWEEN "<<reply->timeBegin<<" AND "<<reply->timeEnd; + } + + if(reply->sequenceBegin >= 0 && reply->sequenceEnd >=0) + { + query<<" AND sequence BETWEEN "<<reply->sequenceBegin<<" AND "<<reply->sequenceEnd; + } + + std::vector<std::vector<string>> data = db->select(query.str()); + + std::list<AbstractPropertyType*> cleanup; + + for(auto i=0;i<data.size();i++) + { + if(data[i].size() != 5) + continue; + + DBObject dbobj; + dbobj.key = data[i][0]; + dbobj.value = data[i][1]; + dbobj.source = data[i][2]; + dbobj.time = boost::lexical_cast<double>(data[i][3]); + dbobj.sequence = boost::lexical_cast<double>(data[i][4]); + + AbstractPropertyType* property = VehicleProperty::getPropertyTypeForPropertyNameValue(dbobj.key,dbobj.value); + if(property) + { + property->timestamp = dbobj.time; + property->sequence = dbobj.sequence; + + reply->values.push_back(property); + cleanup.push_back(property); + } + } + + reply->success = true; + reply->completed(reply); + + /// reply is owned by the requester of this call. we own the data: + for(auto itr = cleanup.begin(); itr != cleanup.end(); itr++) + { + delete *itr; + } + + delete db; +} + +AsyncPropertyReply *DatabaseSink::setProperty(AsyncSetPropertyRequest request) +{ + AsyncPropertyReply* reply = new AsyncPropertyReply(request); + reply->success = false; + return reply; +} + +void DatabaseSink::subscribeToPropertyChanges(VehicleProperty::Property ) +{ + +} + +void DatabaseSink::unsubscribeToPropertyChanges(VehicleProperty::Property ) +{ +} diff --git a/plugins/database/databasesink.h b/plugins/database/databasesink.h new file mode 100644 index 00000000..e759141e --- /dev/null +++ b/plugins/database/databasesink.h @@ -0,0 +1,149 @@ +/* + Copyright (C) 2012 Intel Corporation + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + + +#ifndef DATABASESINK_H +#define DATABASESINK_H + +#include "abstractsink.h" +#include "abstractsource.h" +#include "basedb.hpp" + +#include <glib.h> + +template <typename T> +class Queue +{ +public: + Queue() + { + mutex = g_mutex_new(); + } + + int count() + { + g_mutex_lock(mutex); + int ret = mQueue.count(); + g_mutex_unlock(mutex); + + return ret; + } + + T pop() + { + g_mutex_lock(mutex); + + while(!mQueue.size()) + { + g_cond_wait(&cond, mutex); + } + + auto itr = mQueue.begin(); + + T item = *itr; + + mQueue.erase(itr); + + g_mutex_unlock(mutex); + + return item; + } + + void append(T item) + { + g_mutex_lock(mutex); + + g_cond_signal(&cond); + + mQueue.push_back(item); + + g_mutex_unlock(mutex); + } + +private: + GMutex * mutex; + GCond cond; + std::vector<T> mQueue; +}; + +class DBObject { +public: + DBObject(): time(0), sequence(0), quit(false) {} + std::string key; + std::string value; + std::string source; + double time; + uint32_t sequence; + bool quit; +}; + +class Shared +{ +public: + Shared() + { + db = new BaseDB; + } + ~Shared() + { + delete db; + } + + BaseDB * db; + Queue<DBObject*> queue; +}; + +class DatabaseSink : public AbstractSource +{ + +public: + DatabaseSink(AbstractRoutingEngine* engine, map<string, string> config); + ~DatabaseSink(); + virtual void supportedChanged(PropertyList supportedProperties); + virtual void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, std::string uuid); + virtual std::string uuid(); + + ///source role: + virtual void getPropertyAsync(AsyncPropertyReply *reply); + virtual void getRangePropertyAsync(AsyncRangePropertyReply *reply); + virtual AsyncPropertyReply * setProperty(AsyncSetPropertyRequest request); + virtual void subscribeToPropertyChanges(VehicleProperty::Property property); + virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property); + virtual PropertyList supported(); + int supportedOperations() { return GetRanged; } + +private: + PropertyList mSubscriptions; + Shared *shared; + GThread* thread; + std::string databaseName; + std::string tablename; + std::string tablecreate; +}; + +class DatabaseSinkManager: public AbstractSinkManager +{ +public: + DatabaseSinkManager(AbstractRoutingEngine* engine, map<string, string> config) + :AbstractSinkManager(engine, config) + { + new DatabaseSink(routingEngine, config); + } +}; + +#endif // DATABASESINK_H diff --git a/plugins/database/sqlitedatabase.cpp b/plugins/database/sqlitedatabase.cpp new file mode 100644 index 00000000..7dccc0d4 --- /dev/null +++ b/plugins/database/sqlitedatabase.cpp @@ -0,0 +1,87 @@ +/* + * timedate - Displays time and date and daily events + * Copyright (c) <2009>, Intel Corporation. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms and conditions of the GNU Lesser General Public License, + * version 2.1, as published by the Free Software Foundation. + * + * This program is distributed in the hope it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#include "sqlitedatabase.h" +#include <stdio.h> +#include <sqlite3.h> + + +bool +sqlitedatabase::init(const std::string & d) +{ + setDatabase(d); + return true; +} + +sqlitedatabase::~sqlitedatabase() +{ + sqlite3_close((sqlite3 *)m_odb->db); +} + +bool sqlitedatabase::Connected() +{ + SqliteDB * odb = grabdb(); + if(!odb) + { + return false; + } + freedb(odb); + return true; +} + +void sqlitedatabase::freedb(SqliteDB * odb) +{ + if(odb) + { + odb->busy = false; + } +} + +SqliteDB * sqlitedatabase::grabdb() +{ + SqliteDB * odb = NULL; + + if(!odb) + { + odb = new SqliteDB; + if(!odb) + { + printf("grabdb: SqliteDB struct couldn't be created"); + return NULL; + } + void * p = &odb->db; + if(sqlite3_open(database.c_str(), (sqlite3 **)p)) + { + printf("Can't open database: %s\n", sqlite3_errmsg((sqlite3 *)odb->db)); + sqlite3_close((sqlite3 *)odb->db); + delete odb; + return NULL; + } + odb->busy = true; + } + else + { + odb->busy = true; + } + m_odb = odb; + + return odb; +} + + diff --git a/plugins/database/sqlitedatabase.h b/plugins/database/sqlitedatabase.h new file mode 100644 index 00000000..953013c8 --- /dev/null +++ b/plugins/database/sqlitedatabase.h @@ -0,0 +1,61 @@ +/* + * timedate - Displays time and date and daily events + * Copyright (c) <2009>, Intel Corporation. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms and conditions of the GNU Lesser General Public License, + * version 2.1, as published by the Free Software Foundation. + * + * This program is distributed in the hope it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SQLITEDATABASE_H +#define SQLITEDATABASE_H + +#include <string> +#include <pthread.h> +#include <stdint.h> +#include <list> + + +struct SqliteDB +{ + SqliteDB() : busy(false){} + void * db; + bool busy; +}; // struct SqliteDB + +class sqlitedatabase +{ +public: + sqlitedatabase() {} + virtual ~sqlitedatabase(); + + bool init(const std::string & d); + + virtual void setHost(const std::string &){} + virtual void setUser(const std::string &){} + virtual void setPassword(const std::string &){} + virtual void setDatabase(const std::string & db){database = db;} + + virtual void OnInit(SqliteDB *){} + virtual bool Connected(); + virtual SqliteDB * grabdb(); + virtual void freedb(SqliteDB * odb); + +protected: + SqliteDB *m_odb; + bool m_embedded; + std::string database; +}; + +#endif + diff --git a/plugins/database/sqlitequery.cpp b/plugins/database/sqlitequery.cpp new file mode 100644 index 00000000..223c4d0b --- /dev/null +++ b/plugins/database/sqlitequery.cpp @@ -0,0 +1,402 @@ +/* + * timedate - Displays time and date and daily events + * Copyright (c) <2009>, Intel Corporation. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms and conditions of the GNU Lesser General Public License, + * version 2.1, as published by the Free Software Foundation. + * + * This program is distributed in the hope it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#include "sqlitequery.h" +#include <stdio.h> +#include <sqlite3.h> +#include <string> +#include "utils.h" + +using namespace std; + +bool +sqlitequery::init(sqlitedatabase * db) +{ + m_db = db; + odb = (db ? db->grabdb() : NULL); + return true; +} + +bool +sqlitequery::init(sqlitedatabase * dbin, const std::string & sql) +{ + init(dbin); + return execute(sql); +} + +sqlitequery::~sqlitequery() +{ + if(result) + { + printf( "sqlite3_finalize in destructor\n"); + sqlite3_finalize(result); + } + if(odb) + { + m_db->freedb(odb); + } +} + +bool sqlitequery::execute(const std::string & sql) +{ + std::string sql_sqlite=sql; + m_last_query = sql_sqlite; + if(odb && result) + { + string err = "execute: query busy: "+sql_sqlite; + printf("%s\n", err.c_str()); + } + if(odb && !result) + { + const char * s = NULL; + int err = sqlite3_busy_timeout((sqlite3 *)odb->db, 10000 ); + + if(err!= SQLITE_OK) + { + printf("execute: busy timeout occured: \n"); + return false; + } + int rc = sqlite3_prepare_v2((sqlite3 *)odb->db, sql_sqlite.c_str(), sql_sqlite.size(), &result, &s); + + if(rc != SQLITE_OK) + { + string err = "execute: prepare query failed: "+sql_sqlite; + printf("%s\n", err.c_str()); + result = NULL; + return false; + } + if(!result) + { + printf( "execute: query failed\n"); + result = NULL; + return false; + } + rc = sqlite3_step(result); + sqlite3_finalize(result); + result = NULL; + switch(rc) + { + case SQLITE_BUSY: + printf( "execute: database busy\n"); + return false; + case SQLITE_DONE: + case SQLITE_ROW: + return true; + case SQLITE_SCHEMA: + printf( "execute: Schema error\n"); + return false; + case SQLITE_ERROR: + printf("%s\n", sqlite3_errmsg((sqlite3 *)odb->db)); + return false; + case SQLITE_MISUSE: + printf( "execute: database misuse\n"); + return false; + default: + printf( "execute: unknown result code\n"); + } + } + return false; +} + +bool sqlitequery::fetchRow() +{ + rowcount = 0; + row = false; + if(odb && result) + { + int rc = cache_rc_valid ? cache_rc : sqlite3_step(result); + cache_rc_valid = false; + switch(rc) + { + case SQLITE_BUSY: + printf( "execute: database busy\n"); + return false; + case SQLITE_DONE: + return false; + case SQLITE_ROW: + row = true; + return true; + case SQLITE_ERROR: + printf("%s\n", sqlite3_errmsg((sqlite3 *)odb->db)); + return false; + case SQLITE_MISUSE: + printf( "execute: database misuse\n"); + return false; + default: + printf( "execute: unknown result code\n"); + } + } + return false; +} + +void sqlitequery::freeResult() +{ + if(odb && result) + { + sqlite3_finalize(result); + result = NULL; + row = false; + cache_rc_valid = false; + } + while(m_nmap.size()) + { + std::map<std::string, int>::iterator it = m_nmap.begin(); + m_nmap.erase(it); + } +} + +void sqlitequery::resetStatement() +{ + if( odb && result ) + sqlite3_reset(result); +} + +long sqlitequery::getCount(const std::string & sql) +{ + long l(0); + if(getResult(sql)) + { + if(fetchRow()) + { + l = getVal(rowcount++); + } + freeResult(); + } + return l; +} + +bool sqlitequery::prepareStatement(const std::string & sql) +{ + std::string sql_sqlite=sql; + m_last_query = sql_sqlite; + if(odb && result) + { + string err = "prepareStatement: query busy: "+sql_sqlite; + printf("%s\n", err.c_str()); + } + if(odb && !result) + { + const char * s = NULL; + int rc = sqlite3_prepare_v2((sqlite3 *)odb->db, sql_sqlite.c_str(), sql_sqlite.size(), &result, &s); + if(rc != SQLITE_OK) + { + string err = "prepareStatement: prepare query failed: "+sql_sqlite; + printf("%s\n", err.c_str()); + return false; + } + if(!result) + { + printf( "prepareStatement: query failed\n"); + return false; + } + } + return result; +} + +bool sqlitequery::getResult(const std::string & sql) +{ + if(prepareStatement(sql)) + { + int i(0); + const char * p = sqlite3_column_name(result, i); + while(p) + { + m_nmap[p] = ++i; + p = sqlite3_column_name(result, i); + } + m_num_cols = i; + cache_rc = sqlite3_step(result); + cache_rc_valid = true; + m_row_count = (cache_rc == SQLITE_ROW) ? 1 : 0; + } + return result; +} + +bool sqlitequery::bind(const std::string bindMatch) +{ + if(!odb || !result) return false; + + int r = sqlite3_bind_text(result,1,bindMatch.c_str(),bindMatch.length(),NULL); + + if(r != SQLITE_OK) + { + printf("sqlitequery::bind - error binding query\n"); + return false; + } + + int i(0); + const char * p = sqlite3_column_name(result, i); + while(p) + { + m_nmap[p] = ++i; + p = sqlite3_column_name(result, i); + } + m_num_cols = i; + cache_rc = sqlite3_step(result); + cache_rc_valid = true; + m_row_count = (cache_rc == SQLITE_ROW) ? 1 : 0; + + return true; +} + +bool sqlitequery::bind(const int value) +{ + if(!odb || !result) return false; + + int r = sqlite3_bind_int(result,1,value); + + if(r != SQLITE_OK) + { + printf("sqlitequery::bind - error binding query\n"); + return false; + } + + int i(0); + const char * p = sqlite3_column_name(result, i); + while(p) + { + m_nmap[p] = ++i; + p = sqlite3_column_name(result, i); + } + + m_num_cols = i; + cache_rc = sqlite3_step(result); + cache_rc_valid = true; + m_row_count = (cache_rc == SQLITE_ROW) ? 1 : 0; + + return true; +} + +bool sqlitequery::bind(const double value) +{ + if(!odb || !result) return false; + + int r = sqlite3_bind_double(result,1,value); + + if(r != SQLITE_OK) + { + printf("sqlitequery::bind - error binding query\n"); + return false; + } + + int i(0); + const char * p = sqlite3_column_name(result, i); + while(p) + { + m_nmap[p] = ++i; + p = sqlite3_column_name(result, i); + } + m_num_cols = i; + cache_rc = sqlite3_step(result); + cache_rc_valid = true; + m_row_count = (cache_rc == SQLITE_ROW) ? 1 : 0; + + return cache_rc == SQLITE_OK; +} + +int64_t sqlitequery::getBigInt(int x) +{ + if(odb && result && row) + { + return sqlite3_column_int64(result, x); + } + return 0; +} + +int sqlitequery::GetErrno() +{ + if(odb) + { + return sqlite3_errcode((sqlite3 *)odb->db); + } + return 0; +} + +std::string sqlitequery::GetError() +{ + if(odb) + { + return sqlite3_errmsg((sqlite3 *)odb->db); + } + return ""; +} + +double sqlitequery::getNum(int x) +{ + if(odb && result && row) + { + return sqlite3_column_double(result, x); + } + return 0; +} + +const char * sqlitequery::getStr(int x) +{ + if(odb && result && row && x < sqlite3_column_count(result)) + { + const unsigned char * tmp = sqlite3_column_text(result, x); + return tmp ? (const char *)tmp : ""; + } + return ""; +} + +uint64_t sqlitequery::getUBigInt(int x) +{ + if(odb && result && row) + { + return (uint64_t)sqlite3_column_int64(result, x); + } + return 0; +} + +unsigned long sqlitequery::getUVal(int x) +{ + if(odb && result && row) + { + return (unsigned long)sqlite3_column_int(result, x); + } + return 0; +} + +long sqlitequery::getVal(int x) +{ + if(odb && result && row) + { + return sqlite3_column_int(result, x); + } + return 0; +} + +bool sqlitequery::isNull(int x) +{ + if(odb && result && row) + { + if(sqlite3_column_type(result, x) == SQLITE_NULL) + { + return true; + } + } + return false; +} + +long sqlitequery::numRows() +{ + return odb && result ? m_row_count : 0; +} + diff --git a/plugins/database/sqlitequery.h b/plugins/database/sqlitequery.h new file mode 100644 index 00000000..4679619d --- /dev/null +++ b/plugins/database/sqlitequery.h @@ -0,0 +1,86 @@ +/* + * timedate - Displays time and date and daily events + * Copyright (c) <2009>, Intel Corporation. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms and conditions of the GNU Lesser General Public License, + * version 2.1, as published by the Free Software Foundation. + * + * This program is distributed in the hope it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SQLITEQUERY_H +#define SQLITEQUERY_H + +#include "sqlitedatabase.h" +#include <map> + +class sqlite3_stmt; + +class sqlitequery +{ +public: + sqlitequery() : result(NULL), row(false), cache_rc(0), cache_rc_valid(false), m_row_count(0), m_db(NULL), m_num_cols(0) {} + virtual ~sqlitequery(); + + bool init(sqlitedatabase * dbin); + bool init(sqlitedatabase * dbin, const std::string & sql); + + virtual bool Connected(){return odb ? true : false;} + virtual bool execute(const std::string & sql); + virtual bool fetchRow(); + virtual long numRows(); + virtual void freeResult(); + virtual void resetStatement(); + virtual long getCount(const std::string & sql); + virtual bool prepareStatement(const std::string & sql); + virtual bool getResult(const std::string & sql); + virtual bool bind(const std::string bindMatch); + virtual bool bind(const int value); + virtual bool bind(const double value); + virtual int64_t getBigInt(){return getBigInt(rowcount++);} + virtual int64_t getBigInt(int x); + virtual int GetErrno(); + virtual std::string GetError(); + virtual double getNum(){ return getNum(rowcount++); } + virtual double getNum(int x); + virtual const char * getStr(){return getStr(rowcount++);} + virtual const char * getStr(int x); + virtual uint64_t getUBigInt(){ return getUBigInt(rowcount++); } + virtual uint64_t getUBigInt(int x); + virtual unsigned long getUVal(){ return getUVal(rowcount++); } + virtual unsigned long getUVal(int x); + virtual long getVal(){ return getVal(rowcount++); } + virtual long getVal(int x); + virtual bool isNull(int x); +protected: + sqlite3_stmt * result; + bool row; + int cache_rc; + bool cache_rc_valid; + int m_row_count; + + virtual sqlitequery & operator=(const sqlitequery &){return *this;} + + sqlitedatabase * m_db; + std::string m_last_query; + short rowcount; + std::string m_tmpstr; + std::map<std::string, int> m_nmap; + int m_num_cols; + SqliteDB * odb; + +private: + std::string sql_replace_tokens(std::string sqlstring,std::string &dest); +}; + +#endif + diff --git a/plugins/database/utils.cpp b/plugins/database/utils.cpp new file mode 100644 index 00000000..208da86e --- /dev/null +++ b/plugins/database/utils.cpp @@ -0,0 +1,52 @@ +/* + * timedate - Displays time and date and daily events + * Copyright (c) <2009>, Intel Corporation. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms and conditions of the GNU Lesser General Public License, + * version 2.1, as published by the Free Software Foundation. + * + * This program is distributed in the hope it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#include "utils.h" +#include <stdlib.h> + +using namespace std; + +string +DaoUtils::findReplace(string name, string tofind, string replacewith, string exclusions) +{ + uint i=0; + + uint exclusionPos = exclusions.find(tofind,0); + + while(1) + { + i = name.find(tofind,i); + + if(i != string::npos && exclusionPos != string::npos) + { + if(name.substr(i-exclusionPos,exclusions.length()) == exclusions) + { + i+=replacewith.size(); + continue; + } + } + + if(i == string::npos) + break; + name.replace(i,tofind.size(),replacewith); + i+=replacewith.size(); + } + return name; +} + diff --git a/plugins/database/utils.h b/plugins/database/utils.h new file mode 100644 index 00000000..c1c395d4 --- /dev/null +++ b/plugins/database/utils.h @@ -0,0 +1,35 @@ +/* + * timedate - Displays time and date and daily events + * Copyright (c) <2009>, Intel Corporation. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms and conditions of the GNU Lesser General Public License, + * version 2.1, as published by the Free Software Foundation. + * + * This program is distributed in the hope it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + * more details. + * + * You should have received a copy of the GNU Lesser General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef _DAOUTILS_H_ +#define _DAOUTILS_H_ + +#include <string> + +class DaoUtils +{ +public: + DaoUtils(){} + virtual ~DaoUtils(){} + + static std::string findReplace(std::string name, std::string tofind, std::string replacewith, std::string exclusions=""); +}; + + +#endif diff --git a/plugins/demosink/demosinkplugin.h b/plugins/demosink/demosinkplugin.h index 02fb1914..ffd806cb 100644 --- a/plugins/demosink/demosinkplugin.h +++ b/plugins/demosink/demosinkplugin.h @@ -49,13 +49,13 @@ public: DemoSinkManager(AbstractRoutingEngine* engine, map<string, string> config) :AbstractSinkManager(engine, config) { - + DemoSink* sink = new DemoSink(routingEngine, config); + sink->setConfiguration(config); } void setConfiguration(map<string, string> config) { - DemoSink* sink = new DemoSink(routingEngine, config); - sink->setConfiguration(config); + } }; diff --git a/plugins/exampleplugin.cpp b/plugins/exampleplugin.cpp index 3fc5e4bb..e73a05ee 100644 --- a/plugins/exampleplugin.cpp +++ b/plugins/exampleplugin.cpp @@ -17,6 +17,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include "exampleplugin.h" +#include "timestamp.h" #include <iostream> #include <boost/assert.hpp> @@ -154,6 +155,11 @@ PropertyList ExampleSourcePlugin::supported() return props; } +int ExampleSourcePlugin::supportedOperations() +{ + return Get | Set | GetRanged; +} + void ExampleSourcePlugin::unsubscribeToPropertyChanges(VehicleProperty::Property property) { mRequests.remove(property); @@ -183,12 +189,20 @@ void ExampleSourcePlugin::randomizeProperties() machineGun = !machineGun; - routingEngine->updateProperty(VehicleProperty::VehicleSpeed, &vel); - routingEngine->updateProperty(VehicleProperty::EngineSpeed, &es); - routingEngine->updateProperty(VehicleProperty::AccelerationX, &ac); - routingEngine->updateProperty(VehicleProperty::SteeringWheelAngle, &swa); - routingEngine->updateProperty(VehicleProperty::TransmissionShiftPosition,&tsp); - routingEngine->updateProperty(VehicleProperty::ThrottlePosition, &tp); - routingEngine->updateProperty(VehicleProperty::EngineCoolantTemperature, &ec); + vel.timestamp = amb::currentTime(); + es.timestamp = amb::currentTime(); + ac.timestamp = amb::currentTime(); + swa.timestamp = amb::currentTime(); + tsp.timestamp = amb::currentTime(); + tp.timestamp = amb::currentTime(); + ec.timestamp = amb::currentTime(); + + routingEngine->updateProperty(VehicleProperty::VehicleSpeed, &vel, uuid()); + routingEngine->updateProperty(VehicleProperty::EngineSpeed, &es, uuid()); + routingEngine->updateProperty(VehicleProperty::AccelerationX, &ac, uuid()); + routingEngine->updateProperty(VehicleProperty::SteeringWheelAngle, &swa, uuid()); + routingEngine->updateProperty(VehicleProperty::TransmissionShiftPosition,&tsp, uuid()); + routingEngine->updateProperty(VehicleProperty::ThrottlePosition, &tp, uuid()); + routingEngine->updateProperty(VehicleProperty::EngineCoolantTemperature, &ec, uuid()); //routingEngine->updateProperty(VehicleProperty::MachineGunTurretStatus, &mgt); } diff --git a/plugins/exampleplugin.h b/plugins/exampleplugin.h index 8f4327b3..715f054c 100644 --- a/plugins/exampleplugin.h +++ b/plugins/exampleplugin.h @@ -37,6 +37,8 @@ public: void subscribeToPropertyChanges(VehicleProperty::Property property); void unsubscribeToPropertyChanges(VehicleProperty::Property property); PropertyList supported(); + + int supportedOperations(); void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {} void supportedChanged(PropertyList) {} diff --git a/plugins/examplesink.cpp b/plugins/examplesink.cpp index d3e6c435..fc1f84fb 100644 --- a/plugins/examplesink.cpp +++ b/plugins/examplesink.cpp @@ -21,6 +21,9 @@ #include "abstractroutingengine.h" #include "debugout.h" +#include <boost/date_time/posix_time/posix_time.hpp> + + extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config) { return new ExampleSinkManager(routingengine, config); @@ -31,9 +34,26 @@ ExampleSink::ExampleSink(AbstractRoutingEngine* engine, map<string, string> conf routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this); routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this); +} + + +PropertyList ExampleSink::subscriptions() +{ + +} + +void ExampleSink::supportedChanged(PropertyList supportedProperties) +{ + printf("Support changed!\n"); + routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this); + routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this); + AsyncPropertyRequest velocityRequest; velocityRequest.property = VehicleProperty::VehicleSpeed; - velocityRequest.completed = [](AsyncPropertyReply* reply) { DebugOut()<<"Velocity Async request completed: "<<reply->value->toString()<<endl; delete reply; }; + velocityRequest.completed = [](AsyncPropertyReply* reply) + { + DebugOut()<<"Velocity Async request completed: "<<reply->value->toString()<<endl; delete reply; + }; routingEngine->getPropertyAsync(velocityRequest); @@ -55,21 +75,25 @@ ExampleSink::ExampleSink(AbstractRoutingEngine* engine, map<string, string> conf routingEngine->getPropertyAsync(batteryVoltageRequest); -} + AsyncRangePropertyRequest vehicleSpeedFromLastWeek; + vehicleSpeedFromLastWeek.timeBegin = 1354233906.54099; + vehicleSpeedFromLastWeek.timeEnd = 1354234153.03318; + vehicleSpeedFromLastWeek.property = VehicleProperty::VehicleSpeed; + vehicleSpeedFromLastWeek.completed = [](AsyncRangePropertyReply* reply) + { + std::list<AbstractPropertyType*> values = reply->values; + for(auto itr = values.begin(); itr != values.end(); itr++) + { + auto val = *itr; + DebugOut(0)<<"Velocity value from past: "<<val->toString()<<" time: "<<val->timestamp<<endl; + } + }; -PropertyList ExampleSink::subscriptions() -{ + routingEngine->getRangePropertyAsync(vehicleSpeedFromLastWeek); } -void ExampleSink::supportedChanged(PropertyList supportedProperties) -{ - printf("Support changed!\n"); - routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this); - routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this); -} - void ExampleSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, std::string uuid) { DebugOut()<<property<<" value: "<<value->toString()<<endl; diff --git a/plugins/obd2plugin/CMakeLists.txt b/plugins/obd2plugin/CMakeLists.txt index ae2d3c32..5cb6958c 100644 --- a/plugins/obd2plugin/CMakeLists.txt +++ b/plugins/obd2plugin/CMakeLists.txt @@ -1,4 +1,4 @@ -if(obd2_plugin) +#if(obd2_plugin) include(CheckIncludeFiles) @@ -9,12 +9,12 @@ pkg_check_modules(gio-unix REQUIRED gio-unix-2.0) include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs} ${gio_INCLUDE_DIRS} ${gio-unix_INCLUDE_DIRS} ) -set(obd2sourceplugin_headers obd2source.h obdlib.h bluetoothmanagerproxy.h bluetoothadapterproxy.h bluetoothserialproxy.h ) -set(obd2sourceplugin_sources obd2source.cpp obdlib.cpp bluetooth.hpp bluetoothmanagerproxy.c bluetoothadapterproxy.c bluetoothserialproxy.c) +set(obd2sourceplugin_headers obd2source.h obdlib.h obdpid.h bluetoothmanagerproxy.h bluetoothadapterproxy.h bluetoothserialproxy.h ) +set(obd2sourceplugin_sources obd2source.cpp obdlib.cpp obdpid.cpp bluetooth.hpp bluetoothmanagerproxy.c bluetoothadapterproxy.c bluetoothserialproxy.c) add_library(obd2sourceplugin MODULE ${obd2sourceplugin_sources}) set_target_properties(obd2sourceplugin PROPERTIES PREFIX "") target_link_libraries(obd2sourceplugin amb -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${gio_LIBRARIES} ${gio-unix_LIBRARIES} ) install(TARGETS obd2sourceplugin LIBRARY DESTINATION lib/automotive-message-broker) -endif(obd2_plugin) +#endif(obd2_plugin) diff --git a/plugins/obd2plugin/bluetooth.hpp b/plugins/obd2plugin/bluetooth.hpp index 0e889c56..2be37ca1 100644 --- a/plugins/obd2plugin/bluetooth.hpp +++ b/plugins/obd2plugin/bluetooth.hpp @@ -98,6 +98,91 @@ public: return serialDeviceName; } + void disconnect(std::string address, std::string adapterAddy = "") + { + GError* error = NULL; + OrgBluezManager* manager = org_bluez_manager_proxy_new_for_bus_sync(G_BUS_TYPE_SYSTEM, + G_DBUS_PROXY_FLAGS_NONE, + "org.bluez","/",NULL, &error); + + if(!manager) + { + DebugOut(0)<<"Error getting bluetooth manager proxy: "<<error->message<<endl; + g_error_free(error); + return ; + } + + error = NULL; + + gchar* adapterPath; + + if(adapterAddy != "") + { + if(!org_bluez_manager_call_find_adapter_sync(manager,adapterAddy.c_str(), &adapterPath, NULL, &error)) + { + DebugOut(0)<<"Error getting bluetooth adapter ("<<adapterAddy<<"): "<<error->message<<endl; + g_error_free(error); + return ; + } + + error = NULL; + } + + else + { + if(!org_bluez_manager_call_default_adapter_sync(manager,&adapterPath, NULL, &error)) + { + DebugOut(0)<<"Error getting bluetooth default adapter: "<<error->message<<endl; + g_error_free(error); + return ; + } + + error = NULL; + } + + OrgBluezAdapter* adapter = org_bluez_adapter_proxy_new_for_bus_sync(G_BUS_TYPE_SYSTEM, + G_DBUS_PROXY_FLAGS_NONE, + "org.bluez",adapterPath,NULL,&error); + if(!adapter) + { + DebugOut(0)<<"Error getting bluetooth adapter proxy: "<<error->message<<endl; + g_error_free(error); + return ; + } + + error = NULL; + + gchar* devicePath; + if(!org_bluez_adapter_call_find_device_sync(adapter,address.c_str(),&devicePath,NULL,&error) || + std::string(devicePath) == "") + { + DebugOut(0)<<"Error finding bluetooth device: "<<address<<error->message<<endl; + g_error_free(error); + return ; + } + + error = NULL; + + OrgBluezSerial* serialDevice = org_bluez_serial_proxy_new_for_bus_sync(G_BUS_TYPE_SYSTEM, + G_DBUS_PROXY_FLAGS_NONE, + "org.bluez",devicePath,NULL,&error); + + if(!serialDevice) + { + DebugOut(0)<<"Error getting bluetooth serial device proxy: "<<error->message<<endl; + g_error_free(error); + return ; + } + + gchar* serialDeviceName; + if(!org_bluez_serial_call_disconnect_sync(serialDevice,"spp",NULL,&error)) + { + DebugOut(0)<<"Error disconnecting bluetooth serial device: "<<address<<" - "<<error->message<<endl; + g_error_free(error); + return ; + } + } + }; diff --git a/plugins/obd2plugin/obd2source.cpp b/plugins/obd2plugin/obd2source.cpp index 1bb227d3..6161efd6 100644 --- a/plugins/obd2plugin/obd2source.cpp +++ b/plugins/obd2plugin/obd2source.cpp @@ -27,14 +27,13 @@ #include <listplusplus.h> #include "debugout.h" #include "bluetooth.hpp" - +#include "timestamp.h" #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1) AbstractRoutingEngine *m_re; -uint16_t Obd2Amb::velocity = 0; -double Obd2Amb::fuelConsumptionOldTime = 0; - +//std::list<ObdPid*> Obd2Amb::supportedPidsList; +Obd2Amb *obd2AmbInstance = new Obd2Amb; int calledPersecond = 0; @@ -58,6 +57,54 @@ bool sendElmCommand(obdLib *obd,std::string command) } } + +void connect(obdLib* obd, std::string device, std::string strbaud) +{ + //printf("First: %s\nSecond: %s\n",req->arg.substr(0,req->arg.find(':')).c_str(),req->arg.substr(req->arg.find(':')+1).c_str()); + std::string port = device; + DebugOut() << "Obd2Source::Connect()" << device << strbaud << "\n"; + int baud = boost::lexical_cast<int>(strbaud); + obd->openPort(port.c_str(),baud); +ObdPid::ByteArray replyVector; +std::string reply; + obd->sendObdRequestString("ATZ\r",4,&replyVector,500,3); + for (unsigned int i=0;i<replyVector.size();i++) + { + reply += replyVector[i]; + } + if (reply.find("ELM") == -1) + { + //No reply found + //printf("Error!\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error resetting ELM\n"; + } + else + { + //printf("Reply to reset: %s\n",reply.c_str()); + } + if (!sendElmCommand(obd,"ATSP0")) + { + //printf("Error sending echo\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error setting auto protocol"<<endl; + } + if (!sendElmCommand(obd,"ATE0")) + { + //printf("Error sending echo\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off echo"<<endl; + } + if (!sendElmCommand(obd,"ATH0")) + { + //printf("Error sending headers off\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off headers"<<endl; + } + if (!sendElmCommand(obd,"ATL0")) + { + //printf("Error turning linefeeds off\n"); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off linefeeds"<<endl; + } + obd->sendObdRequestString("010C1\r",6,&replyVector,500,5); +} + void threadLoop(gpointer data) { GAsyncQueue *privCommandQueue = g_async_queue_ref(((OBD2Source*)data)->commandQueue); @@ -66,16 +113,21 @@ void threadLoop(gpointer data) GAsyncQueue *privSubscriptionAddQueue = g_async_queue_ref(((OBD2Source*)data)->subscriptionAddQueue); GAsyncQueue *privSubscriptionRemoveQueue = g_async_queue_ref(((OBD2Source*)data)->subscriptionRemoveQueue); obdLib *obd = new obdLib(); + OBD2Source *source = (OBD2Source*)data; obd->setCommsCallback([](const char* mssg, void* data) { DebugOut(6)<<mssg<<endl; },NULL); obd->setDebugCallback([](const char* mssg, void* data, obdLib::DebugLevel debugLevel) { DebugOut(debugLevel)<<mssg<<endl; },NULL); - std::list<std::string> reqList; - std::list<std::string> repeatReqList; - std::map<std::string,std::string> commandMap; - std::vector<unsigned char> replyVector; + std::list<ObdPid*> reqList; + std::list<ObdPid*> repeatReqList; + ObdPid::ByteArray replyVector; std::string reply; - while (true) + std::string port; + std::string baud; + bool connected=false; + int emptycount = 0; + int timeoutCount = 0; + while (source->m_threadLive) { //gpointer query = g_async_queue_pop(privCommandQueue); @@ -85,96 +137,203 @@ void threadLoop(gpointer data) { //printf("Got request!\n"); DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got single shot request!"<<endl; - ObdRequest *req = (ObdRequest*)query; - repeatReqList.push_back(req->req); - delete req; + ObdPid *req = (ObdPid*)query; + repeatReqList.push_back(req); } query = g_async_queue_try_pop(privSubscriptionAddQueue); if (query != nullptr) { - ObdRequest *req = (ObdRequest*)query; - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got subscription request for "<<req->req<<endl; - reqList.push_back(req->req); - delete req; + ObdPid *req = (ObdPid*)query; + //DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got subscription request for "<<req->req<<endl; + reqList.push_back(req); } query = g_async_queue_try_pop(privCommandQueue); if (query != nullptr) { - ObdRequest *req = (ObdRequest*)query; + //ObdPid *req = (ObdPid*)query; + CommandRequest *req = (CommandRequest*)query; //commandMap[req->req] = req->arg; //printf("Command: %s\n",req->req.c_str()); DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Command:" << req->req << endl; if (req->req == "connect") { - //printf("First: %s\nSecond: %s\n",req->arg.substr(0,req->arg.find(':')).c_str(),req->arg.substr(req->arg.find(':')+1).c_str()); - std::string port = req->arg.substr(0,req->arg.find(':')); - int baud = boost::lexical_cast<int>(req->arg.substr(req->arg.find(':')+1)); - obd->openPort(port.c_str(),baud); - obd->sendObdRequestString("ATZ\r",4,&replyVector,500,3); - for (unsigned int i=0;i<replyVector.size();i++) + if (source->m_isBluetooth) { - reply += replyVector[i]; - } - if (reply.find("ELM") == -1) - { - //No reply found - //printf("Error!\n"); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error resetting ELM\n"; + ObdBluetoothDevice bt; + std::string tempPort = bt.getDeviceForAddress(source->m_btDeviceAddress, source->m_btAdapterAddress); + if(tempPort != "") + { + DebugOut(3)<<"Using bluetooth device \""<<source->m_btDeviceAddress<<"\" bound to: "<<tempPort<<endl; + port = tempPort; + } } else { - //printf("Reply to reset: %s\n",reply.c_str()); + port = req->arglist[0]; + baud = req->arglist[1]; } - if (!sendElmCommand(obd,"ATSP0")) - { - //printf("Error sending echo\n"); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error setting auto protocol"<<endl; - } - if (!sendElmCommand(obd,"ATE0")) - { - //printf("Error sending echo\n"); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off echo"<<endl; - } - if (!sendElmCommand(obd,"ATH0")) - { - //printf("Error sending headers off\n"); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off headers"<<endl; - } - if (!sendElmCommand(obd,"ATL0")) + connect(obd,port,baud); + connected = true; + } + else if (req->req == "connectifnot") + { + if (!connected) { - //printf("Error turning linefeeds off\n"); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off linefeeds"<<endl; + if (source->m_isBluetooth) + { + ObdBluetoothDevice bt; + std::string tempPort = bt.getDeviceForAddress(source->m_btDeviceAddress, source->m_btAdapterAddress); + if(tempPort != "") + { + DebugOut(3)<<"Using bluetooth device \""<<source->m_btDeviceAddress<<"\" bound to: "<<tempPort<<endl; + port = tempPort; + } + } + connect(obd,port,baud); + connected = true; } } + else if (req->req == "setportandbaud") + { + port = req->arglist[0]; + baud = req->arglist[1]; + } + else if (req->req == "disconnect") + { + DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Using queued disconnect" << (ulong)req << "\n"; + obd->closePort(); + ObdBluetoothDevice bt; + bt.disconnect(source->m_btDeviceAddress, source->m_btAdapterAddress); + connected = false; + } delete req; } query = g_async_queue_try_pop(privSubscriptionRemoveQueue); if (query != nullptr) { DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got unsubscription request\n"; - ObdRequest *req = (ObdRequest*)query; - for (std::list<std::string>::iterator i=reqList.begin();i!= reqList.end();i++) + ObdPid *req = (ObdPid*)query; + for (std::list<ObdPid*>::iterator i=reqList.begin();i!= reqList.end();i++) { - if ((*i) == req->req) + if ((*i)->property == req->property) { reqList.erase(i); + delete (*i); i--; } } //reqList.push_back(req->req); delete req; } - - for (std::list<std::string>::iterator i=reqList.begin();i!= reqList.end();i++) + if (reqList.size() > 0 && !connected) + { + /*CommandRequest *req = new CommandRequest(); + req->req = "connect"; + req->arglist.push_back(port); + req->arglist.push_back(baud); + g_async_queue_push(privCommandQueue,req); + continue;*/ + } + else if (reqList.size() == 0 && connected) + { + emptycount++; + if (emptycount < 1000) + { + usleep(10000); + continue; + } + emptycount = 0; + CommandRequest *req = new CommandRequest(); + req->req = "disconnect"; + g_async_queue_push(privCommandQueue,req); + continue; + } + if (!connected) + { + usleep(10000); + continue; + } + for (std::list<ObdPid*>::iterator i=reqList.begin();i!= reqList.end();i++) { repeatReqList.push_back(*i); } - for (std::list<std::string>::iterator i=repeatReqList.begin();i!= repeatReqList.end();i++) + int badloop = 0; + for (std::list<ObdPid*>::iterator i=repeatReqList.begin();i!= repeatReqList.end();i++) { + if (source->m_blacklistPidCountMap.find((*i)->pid) != source->m_blacklistPidCountMap.end()) + { + //Don't erase the pid, just skip over it. + int count = (*source->m_blacklistPidCountMap.find((*i)->pid)).second; + if (count > 10) + { + continue; + } + } + badloop++; + if (!obd->sendObdRequestString((*i)->pid.c_str(),(*i)->pid.length(),&replyVector)) + { + //This only happens during a error with the com port. Close it and re-open it later. + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unable to send request:" << (*i)->pid << "!\n"; + if (obd->lastError() == obdLib::NODATA) + { + DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "OBDLib::NODATA for pid" << (*i)->pid << "\n"; + if (source->m_blacklistPidCountMap.find((*i)->pid) != source->m_blacklistPidCountMap.end()) + { + //pid value i not yet in the list. + int count = (*source->m_blacklistPidCountMap.find((*i)->pid)).second; + if (count > 10) + { + + } + source->m_blacklistPidCountMap.erase(source->m_blacklistPidCountMap.find((*i)->pid)); + source->m_blacklistPidCountMap.insert(pair<std::string,int>((*i)->pid,count)); + } + else + { + source->m_blacklistPidCountMap.insert(pair<std::string,int>((*i)->pid,1)); + } + continue; + } + else if (obd->lastError() == obdLib::TIMEOUT) + { + timeoutCount++; + if (timeoutCount < 2) + { + DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "OBDLib::TIMEOUT for pid" << (*i)->pid << "\n"; + continue; + } + } + else + { + } + + CommandRequest *req = new CommandRequest(); + DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Queuing up a disconnect" << (ulong)req << "\n"; + req->req = "disconnect"; + g_async_queue_push(privCommandQueue,req); + i = repeatReqList.end(); + i--; + continue; + } + if (source->m_blacklistPidCountMap.find((*i)->pid) != source->m_blacklistPidCountMap.end()) + { + //If we get the pid response, then we want to clear out the blacklist list. + source->m_blacklistPidCountMap.erase(source->m_blacklistPidCountMap.find((*i)->pid)); + } + timeoutCount = 0; + //ObdPid *pid = ObdPid::pidFromReply(replyVector); + ObdPid *pid = obd2AmbInstance->createPidFromReply(replyVector); + if (!pid) + { + //Invalid reply + DebugOut() << "Invalid reply"<<endl; + continue; + } + g_async_queue_push(privResponseQueue,pid); //printf("Req: %s\n",(*i).c_str()); - if ((*i) == "ATRV\r") + /*if ((*i) == "ATRV\r") { //printf("Requesting voltage...\n"); if (!obd->sendObdRequestString((*i).c_str(),(*i).length(),&replyVector)) @@ -189,73 +348,29 @@ void threadLoop(gpointer data) replystring += replyVector[j]; } //printf("Voltage reply: %s\n",replystring.c_str()); - replystring.substr(0,replystring.find("V")); - ObdReply *rep = new ObdReply(); + replystring.substr(0,replystring.find("V"));*/ + /*ObdReply *rep = new ObdReply(); rep->req = "ATRV\r"; rep->reply = replystring; - g_async_queue_push(privResponseQueue,rep); - } + g_async_queue_push(privResponseQueue,rep);*/ + /*} if (!obd->sendObdRequest((*i).c_str(),(*i).length(),&replyVector)) { //printf("Error sending obd2 request\n"); DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error sending OBD2 request\n"; continue; - } + }*/ //printf("Reply: %i %i\n",replyVector[0],replyVector[1]); - if (replyVector[0] == 0x41) - { - if (replyVector[1] == 0x0C) - { - double rpm = ((replyVector[2] << 8) + replyVector[3]) / 4.0; - ObdReply *rep = new ObdReply(); - rep->req = "0C"; - rep->property = VehicleProperty::EngineSpeed; - rep->reply = boost::lexical_cast<string>(rpm); - g_async_queue_push(privResponseQueue,rep); - //printf("RPM: %f\n",rpm); - } - else if (replyVector[1] == 0x0D) - { - int mph = replyVector[2]; - ObdReply *rep = new ObdReply(); - rep->req = "0D"; - rep->property = VehicleProperty::VehicleSpeed; - rep->reply = boost::lexical_cast<string>(mph); - g_async_queue_push(privResponseQueue,rep); - } - else if (replyVector[1] == 0x05) - { - int temp = replyVector[2] - 40; - ObdReply *rep = new ObdReply(); - rep->req = "05"; - rep->property = VehicleProperty::EngineCoolantTemperature; - rep->reply = boost::lexical_cast<string>(temp); - g_async_queue_push(privResponseQueue,rep); - } - else if (replyVector[1] == 0x10) - { - double maf = ((replyVector[2] << 8) + replyVector[3]) / 100.0; - ObdReply *rep = new ObdReply(); - rep->req = "10"; - rep->property = VehicleProperty::MassAirFlow; - rep->reply = boost::lexical_cast<string>(maf); - g_async_queue_push(privResponseQueue,rep); - } - else - { - //printf("Unknown response type: %i\n",replyVector[1]); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unknown response type" << replyVector[1] << endl; - } - } + /* + /* else if (replyVector[0] == 0x49) { - /* + /* 49 02 01 00 00 00 31 49 02 02 47 31 4A 43 49 02 03 35 34 34 34 49 02 04 52 37 32 35 49 02 05 32 33 36 37 - */ //VIN number reply string vinstring; for (int j=0;j<replyVector.size();j++) @@ -271,20 +386,25 @@ void threadLoop(gpointer data) //printf("VIN: %i %c\n",replyVector[j],replyVector[j]); } } - ObdReply *rep = new ObdReply(); + /*ObdReply *rep = new ObdReply(); rep->req = "0902"; rep->reply = vinstring; - g_async_queue_push(privResponseQueue,rep); - //printf("VIN Number: %i %s\n",replyVector.size(),vinstring.c_str()); + g_async_queue_push(privResponseQueue,rep);*/ - } - //DebugOut()<<"Reply: "<<replyVector[2]<<" "<<replyVector[3]<<endl; } - if(!reqList.size()) usleep(10000); + if (badloop == 0) + { + //We had zero non-blacklisted events. Pause for a moment here to keep from burning CPU. + usleep(10000); + } repeatReqList.clear(); + + } + if (connected) + { + obd->closePort(); } - } static int updateProperties(/*gpointer retval,*/ gpointer data) @@ -294,23 +414,11 @@ static int updateProperties(/*gpointer retval,*/ gpointer data) while(gpointer retval = g_async_queue_try_pop(src->responseQueue)) { - ObdReply *reply = (ObdReply*)retval; + ObdPid *reply = (ObdPid*)retval; - Obd2Amb obd2amb; - - if(obd2amb.propertyPidMap.count(reply->property) != 0) - { - std::string convValue = reply->reply; - - if(obd2amb.propertyConversionMap.count(reply->property)) - { - convValue = obd2amb.propertyConversionMap[reply->property](reply->reply); - } - - - AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(reply->property, convValue); - src->updateProperty(reply->property, value); - } + + AbstractPropertyType* value = VehicleProperty::getPropertyTypeForPropertyNameValue(reply->property, reply->value); + src->updateProperty(reply->property, value); /*if (reply->req == "05") { @@ -364,37 +472,23 @@ static int updateProperties(/*gpointer retval,*/ gpointer data) return true; } + void OBD2Source::updateProperty(VehicleProperty::Property property,AbstractPropertyType* value) { //m_re->updateProperty(property,&value); - m_re->updateProperty(property,value); + if (propertyReplyMap.find(property) != propertyReplyMap.end()) { propertyReplyMap[property]->value = value; propertyReplyMap[property]->completed(propertyReplyMap[property]); propertyReplyMap.erase(property); } + else + { + m_re->updateProperty(property,value,uuid()); + } } -void OBD2Source::mafValue(double maf) -{ - VehicleProperty::VehicleSpeedType emaf(maf); - m_re->updateProperty(VehicleProperty::MassAirFlow,&emaf); -} -void OBD2Source::engineCoolantTemp(int temp) -{ - VehicleProperty::VehicleSpeedType etemp(temp); - m_re->updateProperty(VehicleProperty::EngineCoolantTemperature,&etemp); -} -void OBD2Source::engineSpeed(double speed) -{ - VehicleProperty::VehicleSpeedType espeed(speed); - m_re->updateProperty(VehicleProperty::EngineSpeed,&espeed); -} -void OBD2Source::vehicleSpeed(int speed) -{ - VehicleProperty::EngineSpeedType vspeed(speed); - m_re->updateProperty(VehicleProperty::VehicleSpeed,&vspeed); -} + void OBD2Source::setSupported(PropertyList list) { m_supportedProperties = list; @@ -415,6 +509,7 @@ void OBD2Source::setConfiguration(map<string, string> config) std::string port = "/dev/ttyUSB0"; std::string baud = "115200"; std::string btadapter = ""; + m_isBluetooth = false; //Try to load config //printf("OBD2Source::setConfiguration\n"); @@ -439,6 +534,9 @@ void OBD2Source::setConfiguration(map<string, string> config) if(port.find(":") != string::npos) { + m_btDeviceAddress = port; + m_btAdapterAddress = btadapter; + m_isBluetooth = true; ///TODO: bluetooth!! DebugOut()<<"bluetooth device?"<<endl; ObdBluetoothDevice bt; @@ -449,25 +547,42 @@ void OBD2Source::setConfiguration(map<string, string> config) DebugOut(3)<<"Using bluetooth device \""<<port<<"\" bound to: "<<tempPort<<endl; port = tempPort; } - else throw std::runtime_error("Device Error"); + else + { + DebugOut(0)<<"Device Error"<<endl; + ///Don't throw here. + //throw std::runtime_error("Device Error"); + } } - ObdRequest *requ = new ObdRequest(); - requ->req = "connect"; - requ->arg = port + ":" + baud; - g_async_queue_push(commandQueue,requ); + //connect(obd, port, baud); + CommandRequest *req = new CommandRequest(); + req->req = "setportandbaud"; + req->arglist.push_back(port); + req->arglist.push_back(baud); + g_async_queue_push(commandQueue,req); + + m_port = port; + m_baud = baud; + m_gThread = g_thread_new("mythread",(GThreadFunc)&threadLoop,this); + //g_idle_add(updateProperties, this); + g_timeout_add(10,updateProperties,this); } -OBD2Source::OBD2Source(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config) +OBD2Source::OBD2Source(AbstractRoutingEngine *re, map<string, string> config) + : AbstractSource(re, config), Obd2Connect("Obd2Connect") { + VehicleProperty::registerProperty(Obd2Connect,[](){ return new Obd2ConnectType(false); }); clientConnected = false; m_re = re; + m_threadLive = true; Obd2Amb obd2amb; + obd = new obdLib(); - for(auto itr = obd2amb.propertyPidMap.begin(); itr != obd2amb.propertyPidMap.end(); itr++) + for(auto itr = obd2amb.supportedPidsList.begin(); itr != obd2amb.supportedPidsList.end(); itr++) { - m_supportedProperties.push_back((*itr).first); + m_supportedProperties.push_back((*itr)->property); } re->setSupported(supported(), this); @@ -480,22 +595,26 @@ OBD2Source::OBD2Source(AbstractRoutingEngine *re, map<string, string> config) : subscriptionRemoveQueue = g_async_queue_new(); responseQueue = g_async_queue_new(); singleShotQueue = g_async_queue_new(); - g_thread_new("mythread",(GThreadFunc)&threadLoop,this); setConfiguration(config); - - //AsyncQueueWatcher * watcher = new AsyncQueueWatcher(responseQueue, (AsyncQueueWatcherCallback) updateProperties, this); - - //g_timeout_add(1,updateProperties, this); - g_idle_add(updateProperties, this); - //g_timeout_add(1000,calcCPS,NULL); - +} +OBD2Source::~OBD2Source() +{ + DebugOut() << "OBD2Source Destructor called!!!"<<endl; + m_threadLive = false; + g_thread_join(m_gThread); } PropertyList OBD2Source::supported() { return m_supportedProperties; } + +int OBD2Source::supportedOperations() +{ + return Get | Set; +} + extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config) { return new OBD2Source(routingengine, config); @@ -585,10 +704,26 @@ void OBD2Source::subscribeToPropertyChanges(VehicleProperty::Property property) return; } - Obd2Amb obd2amb; - ObdRequest *requ = new ObdRequest(); - requ->req = obd2amb.propertyPidMap[property]; - g_async_queue_push(subscriptionAddQueue,requ); + + ObdPid *pid = obd2AmbInstance->createPidforProperty(property); + + if(!pid) + { + return; + } + + //If the pid is currently in the blacklist map, erase it. This allows for applications + //to "un-blacklist" a pid by re-subscribing to it. + if (m_blacklistPidCountMap.find(pid->pid) != m_blacklistPidCountMap.end()) + { + m_blacklistPidCountMap.erase(m_blacklistPidCountMap.find(pid->pid)); + } + + + g_async_queue_push(subscriptionAddQueue,pid); + CommandRequest *req = new CommandRequest(); + req->req = "connectifnot"; + g_async_queue_push(commandQueue,req); } } @@ -658,11 +793,8 @@ void OBD2Source::unsubscribeToPropertyChanges(VehicleProperty::Property property return; } - Obd2Amb obd2amb; - ObdRequest *requ = new ObdRequest(); - requ->property = property; - requ->req = obd2amb.propertyPidMap[property]; - g_async_queue_push(subscriptionRemoveQueue,requ); + ObdPid *pid = obd2AmbInstance->createPidforProperty(property); + g_async_queue_push(subscriptionRemoveQueue,pid); } @@ -736,11 +868,11 @@ void OBD2Source::getPropertyAsync(AsyncPropertyReply *reply) return; } - Obd2Amb obd2amb; - ObdRequest *requ = new ObdRequest(); - requ->property = property; - requ->req = obd2amb.propertyPidMap[property]; + ObdPid* requ = obd2AmbInstance->createPidforProperty(property); g_async_queue_push(singleShotQueue,requ); + CommandRequest *req = new CommandRequest(); + req->req = "connectifnot"; + g_async_queue_push(commandQueue,req); } AsyncPropertyReply *OBD2Source::setProperty(AsyncSetPropertyRequest request ) diff --git a/plugins/obd2plugin/obd2source.h b/plugins/obd2plugin/obd2source.h index c524759a..bdb24a80 100644 --- a/plugins/obd2plugin/obd2source.h +++ b/plugins/obd2plugin/obd2source.h @@ -33,6 +33,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA #include "obdlib.h" #include <glib.h> +#include "obdpid.h" class ObdRequest { @@ -42,6 +43,14 @@ public: std::string arg; }; + +class CommandRequest +{ +public: + std::string req; + std::vector<std::string> arglist; +}; + class ObdReply { public: @@ -51,14 +60,17 @@ public: }; + class Obd2Amb { public: typedef function<std::string (std::string)> ConversionFunction; + typedef std::vector<unsigned char> ByteArray; Obd2Amb() { +<<<<<<< HEAD propertyPidMap[VehicleProperty::VehicleSpeed] = "010D1\r"; propertyPidMap[VehicleProperty::EngineSpeed] = "010C1\r"; propertyPidMap[VehicleProperty::MassAirFlow] = "01101\r"; @@ -84,28 +96,56 @@ public: return input; }; +======= + supportedPidsList.push_back(new VehicleSpeedPid()); + supportedPidsList.push_back(new EngineSpeedPid()); + supportedPidsList.push_back(new MassAirFlowPid()); + supportedPidsList.push_back(new VinPid()); + supportedPidsList.push_back(new WmiPid()); + supportedPidsList.push_back(new FuelConsumptionPid()); + supportedPidsList.push_back(new EngineCoolantPid()); + supportedPidsList.push_back(new AirIntakeTemperaturePid()); + } +>>>>>>> release - propertyConversionMap[VehicleProperty::WMI] = [](std::string input) + ~Obd2Amb() + { + for(auto itr = supportedPidsList.begin(); itr != supportedPidsList.end(); itr++) { - return input.substr(0,3); - }; + delete *itr; + } + } - propertyConversionMap[VehicleProperty::FuelConsumption] = [](std::string input) + ObdPid* createPidFromReply(ByteArray replyVector) + { + for(auto itr = supportedPidsList.begin(); itr != supportedPidsList.end(); itr++) { - double maf; - stringstream mafConvert(input); - - mafConvert>>maf; - - mafConvert<<1 / (14.75 * 6.26) * maf * 0/60; - - return mafConvert.str(); - }; - - - + if (!(*itr)->tryParse(replyVector)) + { + continue; + } + + ObdPid* pid = (*itr)->create(); + pid->tryParse(replyVector); + return pid; + } + return 0; + } + ObdPid* createPidforProperty(VehicleProperty::Property property) + { + for(auto itr = supportedPidsList.begin(); itr != supportedPidsList.end(); itr++) + { + VehicleProperty::Property p = (*itr)->property; + if(p == property) + { + ObdPid* obj = *itr; + return obj->create(); + } + } + return NULL; } +<<<<<<< HEAD map<VehicleProperty::Property, ConversionFunction> propertyConversionMap; @@ -115,6 +155,9 @@ private: static uint16_t velocity; static double fuelConsumptionOldTime; +======= + std::list<ObdPid*> supportedPidsList; +>>>>>>> release }; class OBD2Source : public AbstractSource @@ -122,6 +165,7 @@ class OBD2Source : public AbstractSource public: OBD2Source(AbstractRoutingEngine* re, map<string, string> config); + ~OBD2Source(); string uuid(); int portHandle; void getPropertyAsync(AsyncPropertyReply *reply); @@ -130,6 +174,9 @@ public: void subscribeToPropertyChanges(VehicleProperty::Property property); void unsubscribeToPropertyChanges(VehicleProperty::Property property); PropertyList supported(); + + int supportedOperations(); + PropertyList queuedRequests; bool clientConnected; PropertyList activeRequests; @@ -146,15 +193,26 @@ public: GAsyncQueue* subscriptionRemoveQueue; GAsyncQueue* singleShotQueue; GAsyncQueue* responseQueue; + std::list<std::string> m_blacklistPidList; + std::map<std::string,int> m_blacklistPidCountMap; void setConfiguration(map<string, string> config); //void randomizeProperties(); std::string m_port; + std::string m_baud; + bool m_isBluetooth; + std::string m_btDeviceAddress; + std::string m_btAdapterAddress; map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap; void updateProperty(VehicleProperty::Property property,AbstractPropertyType *value); + obdLib * obd; + bool m_threadLive; + GThread *m_gThread; + private: PropertyList m_supportedProperties; GMutex *threadQueueMutex; - + VehicleProperty::Property Obd2Connect; + typedef BasicPropertyType<bool> Obd2ConnectType; }; diff --git a/plugins/obd2plugin/obdlib.cpp b/plugins/obd2plugin/obdlib.cpp index 32bb43db..e4f874f7 100644 --- a/plugins/obd2plugin/obdlib.cpp +++ b/plugins/obd2plugin/obdlib.cpp @@ -160,18 +160,30 @@ int obdLib::openPort(const char *portName,int baudrate) {
}
- newtio.c_cflag |= (CLOCAL | CREAD);
+ newtio.c_cflag = (newtio.c_cflag & ~CSIZE) | CS8;
+ newtio.c_cflag |= CLOCAL | CREAD;
+ newtio.c_cflag &= ~CRTSCTS;
+ newtio.c_cflag &= ~CSTOPB;
+ newtio.c_iflag=IGNBRK;
+ newtio.c_iflag &= ~(IXON|IXOFF|IXANY);
+ newtio.c_lflag=0;
+ newtio.c_oflag=0;
+ newtio.c_cc[VTIME]=1; //1/10th second timeout, reduces CPU usage but still allows for timeouts
+ newtio.c_cc[VMIN]=1; //We want a pure timer timeout
+
+
+ /*newtio.c_cflag |= (CLOCAL | CREAD);
newtio.c_lflag &= !(ICANON | ECHO | ECHOE | ISIG);
newtio.c_oflag &= !(OPOST);
newtio.c_cc[VMIN] = 0;
- newtio.c_cc[VTIME] = 100;
+ newtio.c_cc[VTIME] = 100;*/
/*
newtio.c_cflag &= ~CSIZE; //Disable byte size
newtio.c_cflag &= ~PARENB; //Disable parity
newtio.c_cflag &= ~CSTOPB; //Disable stop bits
newtio.c_cflag |= (CLOCAL | CREAD | CS8); //Set local mode, reader, and 8N1.
- newtio.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG); //Disable CANON, echo, and signals
+ newtio.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG); //Disausleep(10000);ble CANON, echo, and signals
newtio.c_oflag &= ~(OPOST); //Disable post processing
*/
@@ -188,6 +200,7 @@ int obdLib::openPort(const char *portName,int baudrate) }
debug(obdLib::DEBUG_VERBOSE,"Setting baud rate to %i on port %s\n",baudrate,portName);
}
+ fcntl(portHandle, F_SETFL, 0); //Set to blocking
tcsetattr(portHandle,TCSANOW,&newtio);
//newtio.c_cc[VMIN] = 0; //Minimum number of bytes to read
//newtio.c_cc[VTIME] = 100; //Read Timeout (10.0 seconds)
@@ -213,6 +226,12 @@ int obdLib::closePort() #endif
return 0;
}
+
+bool obdLib::connected()
+{
+
+}
+
int obdLib::initPort()
{
sendObdRequest("atz\r",4);
@@ -372,11 +391,9 @@ bool obdLib::sendObdRequestString(const char *req,int length,std::vector<byte> * #endif
if (len < 0)
{
- printf("No Write\n");
- //delete tmp;
- //delete totalReply;
- //m_lastError = SERIALWRITEERROR;
- //return false;
+ debug(obdLib::DEBUG_ERROR,"Serial write error: %s", strerror(errno));
+
+ return false;
}
if (sleeptime == -1)
{
@@ -409,7 +426,7 @@ bool obdLib::sendObdRequestString(const char *req,int length,std::vector<byte> * if (len < 0)
{
- printf("No Read\n");
+ //printf("No Read\n");
perror("Error");
delete[] tmp;
delete[] totalReply;
@@ -422,6 +439,7 @@ bool obdLib::sendObdRequestString(const char *req,int length,std::vector<byte> * #ifdef WINVER
Sleep(10);
#else
+ //printf("Timeout\n");
usleep(10000);
#endif
}
diff --git a/plugins/obd2plugin/obdlib.h b/plugins/obd2plugin/obdlib.h index 7fdb99a2..5c3cc2b7 100644 --- a/plugins/obd2plugin/obdlib.h +++ b/plugins/obd2plugin/obdlib.h @@ -105,6 +105,7 @@ public: void setPortHandle(HANDLE hdnl); int initPort(); int closePort(); + bool connected(); void flush(); void setDebugCallback(void (*callbackptr)(const char*,void*,obdLib::DebugLevel),void *); void setCommsCallback(void (*callbackptr)(const char*,void*),void*); diff --git a/plugins/obd2plugin/obdpid.cpp b/plugins/obd2plugin/obdpid.cpp new file mode 100644 index 00000000..81f12c4e --- /dev/null +++ b/plugins/obd2plugin/obdpid.cpp @@ -0,0 +1,4 @@ +#include "obdpid.h" + + +double FuelConsumptionPid::oldTime=0; diff --git a/plugins/obd2plugin/obdpid.h b/plugins/obd2plugin/obdpid.h new file mode 100644 index 00000000..74b3483a --- /dev/null +++ b/plugins/obd2plugin/obdpid.h @@ -0,0 +1,282 @@ +#ifndef _OBDPID_H__H_H_ +#define _OBDPID_H__H_H_ + +#include <vector> +#include <string> +#include <vehicleproperty.h> +#include "obdlib.h" +#include <time.h> + +class ObdPid +{ +public: + typedef std::vector<unsigned char> ByteArray; + + ObdPid(VehicleProperty::Property prop, std::string p, int i) + :property(prop), pid(p), id(i), type(0x41) + { + + } + static ByteArray cleanup(ByteArray replyVector) + { + ByteArray tmp; + for (int i=0;i<replyVector.size();i++) + { + if ((replyVector[i] != 0x20) && (replyVector[i] != '\r') && (replyVector[i] != '\n')) + { + tmp.push_back(replyVector[i]); + } + } + return tmp; + } + static ByteArray compress(ByteArray replyVector) + { + ByteArray tmp; + for (int i=0;i<replyVector.size();i++) + { + tmp.push_back(obdLib::byteArrayToByte(replyVector[i],replyVector[i+1])); + i++; + } + return tmp; + } + virtual ObdPid* create() = 0; + + virtual bool tryParse(ByteArray replyVector) = 0; + + VehicleProperty::Property property; + std::string pid; + int id; + int type; + std::string value; +}; + +template <class T> +class CopyMe: public ObdPid +{ +public: + + CopyMe(VehicleProperty::Property prop, std::string p, int i) + :ObdPid(prop, p, i) + { + + } + + ObdPid* create() + { + return new T(); + } +}; + + +class VehicleSpeedPid: public CopyMe<VehicleSpeedPid> +{ +public: + + VehicleSpeedPid() + :CopyMe(VehicleProperty::VehicleSpeed, "010D1\r", 0x0D) + { + + } + bool tryParse(ByteArray replyVector) + { + ByteArray tmp = compress(cleanup(replyVector)); + //for (int i=0;i<tmp.size();i++) + //{ + //printf("%i ",tmp[i]); + //} + //printf("\n"); + if (tmp[1] != 0x0D) + { + return false; + } + int mph = tmp[2]; + value = boost::lexical_cast<std::string>(mph); + return true; + } +}; + +class EngineSpeedPid: public CopyMe<EngineSpeedPid> +{ +public: + + EngineSpeedPid() + :CopyMe(VehicleProperty::EngineSpeed,"010C1\r",0x0C) + { + + } + bool tryParse(ByteArray replyVector) + { + ByteArray tmp = compress(cleanup(replyVector)); + if (tmp[1] != 0x0C) + { + return false; + } + double rpm = ((tmp[2] << 8) + tmp[3]) / 4.0; + value = boost::lexical_cast<std::string>(rpm); + return true; + } +}; + +class EngineCoolantPid: public CopyMe<EngineCoolantPid> +{ +public: + + EngineCoolantPid() + :CopyMe(VehicleProperty::EngineCoolantTemperature,"01051\r",0x05) + { + + } + bool tryParse(ByteArray replyVector) + { + ByteArray tmp = compress(cleanup(replyVector)); + if (tmp[1] != id) + { + return false; + } + int temp = tmp[2] - 40; + value = boost::lexical_cast<std::string>(temp); + return true; + } +}; + +class MassAirFlowPid: public CopyMe<MassAirFlowPid> +{ +public: + + MassAirFlowPid() + :CopyMe(VehicleProperty::MassAirFlow,"01101\r",0x01) + { + + } + bool tryParse(ByteArray replyVector) + { + ByteArray tmp = compress(cleanup(replyVector)); + if (tmp[1] != 0x10) + { + return false; + } + maf = ((tmp[2] << 8) + tmp[3]) / 100.0; + value = boost::lexical_cast<std::string>(maf); + return true; + } + +protected: + double maf; +}; + + +class FuelConsumptionPid: public MassAirFlowPid +{ +public: + FuelConsumptionPid() + + { + + } + bool tryParse(ByteArray replyVector) + { + if(!MassAirFlowPid::tryParse(replyVector)) + return false; + + timespec t; + clock_gettime(CLOCK_REALTIME, &t); + + double currentTime = t.tv_sec + t.tv_nsec / 1000000; + + double diffTime = currentTime - oldTime; + oldTime = currentTime; + + double consumption = 1 / (14.75 * 6.26) * maf * diffTime/60; + + value = boost::lexical_cast<std::string>(consumption); + return true; + } + +private: + + static double oldTime; +}; + + +class VinPid: public CopyMe<VinPid> +{ +public: + + VinPid() + :CopyMe(VehicleProperty::VIN,"0902\r",0x02) + { + type = 0x49; + } + bool tryParse(ByteArray replyVector) + { + std::string vinstring; + ByteArray tmp = compress(cleanup(replyVector)); + if (tmp[0] != 0x49 || tmp[1] != 0x02) + { + return false; + } + for (int j=0;j<tmp.size();j++) + { + if(tmp[j] == 0x49 && tmp[j+1] == 0x02) + { + //We're at a reply header + j+=3; + } + if (tmp[j] != 0x00) + { + vinstring += (char)tmp[j]; + //printf("VIN: %i %c\n",replyVector[j],replyVector[j]); + } + } + + value = vinstring; + return true; + } + +}; + +class WmiPid: public VinPid +{ +public: + + WmiPid() + :VinPid() + { + property = VehicleProperty::WMI; + } + bool tryParse(ByteArray replyVector) + { + if (!VinPid::tryParse(replyVector)) + { + return false; + } + value = value.substr(0,3); + return true; + } + +}; + +class AirIntakeTemperaturePid: public CopyMe<AirIntakeTemperaturePid> +{ +public: + AirIntakeTemperaturePid() + :CopyMe(VehicleProperty::AirIntakeTemperature,"010F1\r",0x0F) + { + + } + + bool tryParse(ByteArray replyVector) + { + ByteArray tmp = compress(cleanup(replyVector)); + + if (tmp[1] != id) + { + return false; + } + int temp = tmp[2] - 40; + value = boost::lexical_cast<std::string>(temp); + return true; + } +}; + +#endif diff --git a/plugins/tpms/tpmsplugin.cpp b/plugins/tpms/tpmsplugin.cpp index 01cb444d..c3f912d0 100644 --- a/plugins/tpms/tpmsplugin.cpp +++ b/plugins/tpms/tpmsplugin.cpp @@ -26,6 +26,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA using namespace std; #include "debugout.h" +#include "timestamp.h" #define ENDPOINT_IN 0x81 #define ENDPOINT_OUT 0x01 @@ -298,14 +299,14 @@ int TpmsPlugin::readValues() VehicleProperty::TireTemperatureType lrTemp(lrTemperature); VehicleProperty::TireTemperatureType rrTemp(rrTemperature); - routingEngine->updateProperty(VehicleProperty::TirePressureLeftFront, &lfPres); - routingEngine->updateProperty(VehicleProperty::TirePressureRightFront, &rfPres); - routingEngine->updateProperty(VehicleProperty::TirePressureLeftRear, &lrPres); - routingEngine->updateProperty(VehicleProperty::TirePressureRightRear, &rrPres); - routingEngine->updateProperty(VehicleProperty::TireTemperatureLeftFront, &lfTemp); - routingEngine->updateProperty(VehicleProperty::TireTemperatureRightFront, &rfTemp); - routingEngine->updateProperty(VehicleProperty::TireTemperatureLeftRear, &lrTemp); - routingEngine->updateProperty(VehicleProperty::TireTemperatureRightRear, &rrTemp); + routingEngine->updateProperty(VehicleProperty::TirePressureLeftFront, &lfPres, uuid()); + routingEngine->updateProperty(VehicleProperty::TirePressureRightFront, &rfPres, uuid()); + routingEngine->updateProperty(VehicleProperty::TirePressureLeftRear, &lrPres, uuid()); + routingEngine->updateProperty(VehicleProperty::TirePressureRightRear, &rrPres, uuid()); + routingEngine->updateProperty(VehicleProperty::TireTemperatureLeftFront, &lfTemp, uuid()); + routingEngine->updateProperty(VehicleProperty::TireTemperatureRightFront, &rfTemp, uuid()); + routingEngine->updateProperty(VehicleProperty::TireTemperatureLeftRear, &lrTemp, uuid()); + routingEngine->updateProperty(VehicleProperty::TireTemperatureRightRear, &rrTemp, uuid()); return 0; } diff --git a/plugins/tpms/tpmsplugin.h b/plugins/tpms/tpmsplugin.h index 189c91b8..f0f26684 100644 --- a/plugins/tpms/tpmsplugin.h +++ b/plugins/tpms/tpmsplugin.h @@ -42,6 +42,8 @@ public: void supportedChanged(PropertyList) {} int readValues(); + + int supportedOperations() { return Get; } private: PropertyList mRequests; diff --git a/plugins/websocketsink/protocol b/plugins/websocketsink/protocol new file mode 100644 index 00000000..5b836a89 --- /dev/null +++ b/plugins/websocketsink/protocol @@ -0,0 +1,24 @@ +Example protocol messages + +Property changed event: +{"type":"valuechanged","name":"VehicleSpeed","data":"217","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp":"1354521964.60253","sequence":"0"}1354521964.25081", "sequence": "0" } + +Get property request: +{"type":"method","name":"get","data":["VehicleSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} + +Get property reply: +{"type":"methodReply","name":"get","data":[{"property":"VehicleSpeed","value":"17"}],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp" : "1354521964.24962", "sequence": "0" } + +Get supported request: +{"type":"method","name":"getSupportedEventTypes","data":[],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} + +Get supported reply: +{"type":"methodReply","name":"getSupportedEventTypes","data":["running_status_speedometer","running_status_engine_speed","running_status_steering_wheel_angle","running_status_transmission_gear_status","EngineSpeed","VehicleSpeed","AccelerationX","TransmissionShiftPosition","SteeringWheelAngle","ThrottlePosition","EngineCoolantTemperature","VIN","WMI","BatteryVoltage","MachineGunTurretStatus"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} + +Subscribe to data: +{"type":"method","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} + +Subscribe to data reply: +{"type":"methodReply","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} + + diff --git a/plugins/websocketsink/websocketsink.cpp b/plugins/websocketsink/websocketsink.cpp index 6f022942..bbcb4527 100644 --- a/plugins/websocketsink/websocketsink.cpp +++ b/plugins/websocketsink/websocketsink.cpp @@ -64,8 +64,9 @@ void WebSocketSink::propertyChanged(VehicleProperty::Property property, Abstract tmpstr = property; } + s.precision(15); - s << "{\"type\":\"valuechanged\",\"name\":\"" << tmpstr << "\",\"data\":\"" << value->toString() << "\",\"transactionid\":\"" << m_uuid << "\"}"; + s << "{\"type\":\"valuechanged\",\"name\":\"" << tmpstr << "\",\"data\":\"" << value->toString() << "\",\"transactionid\":\"" << m_uuid << "\", \"timestamp\":\""<<value->timestamp<<"\",\"sequence\":\""<<value->sequence<<"\"}"; string replystr = s.str(); //printf("Reply: %s\n",replystr.c_str()); diff --git a/plugins/websocketsink/websocketsinkmanager.cpp b/plugins/websocketsink/websocketsinkmanager.cpp index e189ae2c..dc8cc9e9 100644 --- a/plugins/websocketsink/websocketsinkmanager.cpp +++ b/plugins/websocketsink/websocketsinkmanager.cpp @@ -52,6 +52,10 @@ void WebSocketSinkManager::init() setConfiguration(configuration); } +list< VehicleProperty::Property > WebSocketSinkManager::getSupportedProperties() +{ + return m_engine->supported(); +} void WebSocketSinkManager::setConfiguration(map<string, string> config) { // //Config has been passed, let's start stuff up. @@ -119,15 +123,20 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper printf("Got property:%s\n",reply->value->toString().c_str()); //uint16_t velocity = boost::any_cast<uint16_t>(reply->value); stringstream s; - + s.precision(15); //TODO: Dirty hack hardcoded stuff, jsut to make it work. string tmpstr = ""; tmpstr = property; - s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}"; + + /// TODO: timestamp and sequence need to be inside the "data" object: + + s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"property\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() + << "\"}],\"transactionid\":\"" << id << "\", \"timestamp\" : \""<<reply->value->timestamp<<"\", " + <<"\"sequence\": \""<<reply->value->sequence<<"\" }"; string replystr = s.str(); //printf("Reply: %s\n",replystr.c_str()); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n"; + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << endl; char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; new_response+=LWS_SEND_BUFFER_PRE_PADDING; @@ -147,8 +156,8 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Vehicle { AsyncRangePropertyRequest rangedRequest; - rangedRequest.begin = start; - rangedRequest.end = end; + rangedRequest.timeBegin = start; + rangedRequest.timeEnd = end; if (property == "running_status_speedometer") { @@ -186,7 +195,7 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Vehicle //TODO: Dirty hack hardcoded stuff, jsut to make it work. stringstream data ("["); - std::list<PropertyValueTime*> values = reply->values; + std::list<AbstractPropertyType*> values = reply->values; for(auto itr = values.begin(); itr != values.end(); itr++) { if(itr != values.begin()) @@ -194,7 +203,7 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Vehicle data<<","; } - data << "{ \"value\" : " << "\"" << (*itr)->value->toString() << "\", \"time\" : \"" << (*itr)->timestamp << "\" }"; + data << "{ \"value\" : " << "\"" << (*itr)->toString() << "\", \"timestamp\" : \"" << (*itr)->timestamp << "\", \"sequence\" : \""<<(*itr)->sequence<<"\" }"; } data<<"]"; @@ -625,7 +634,8 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb { //Send what properties we support typessupported = "\"running_status_speedometer\",\"running_status_engine_speed\",\"running_status_steering_wheel_angle\",\"running_status_transmission_gear_status\""; - PropertyList foo = VehicleProperty::capabilities(); + + PropertyList foo = sinkManager->getSupportedProperties(); PropertyList::const_iterator i=foo.cbegin(); while (i != foo.cend()) { @@ -654,7 +664,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb } else { - PropertyList foo = VehicleProperty::capabilities(); + PropertyList foo = sinkManager->getSupportedProperties(); if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front())) { //sinkManager->addSingleShotSink(wsi,data.front(),id); diff --git a/plugins/websocketsink/websocketsinkmanager.h b/plugins/websocketsink/websocketsinkmanager.h index 4a13caa5..f0107977 100644 --- a/plugins/websocketsink/websocketsinkmanager.h +++ b/plugins/websocketsink/websocketsinkmanager.h @@ -45,6 +45,7 @@ public: map<std::string, list<WebSocketSink*> > m_sinkMap; void setConfiguration(map<string, string> config); void setValue(string property,string value); + list<VehicleProperty::Property> getSupportedProperties(); private: map<int,GIOChannel*> m_ioChannelMap; map<int,guint> m_ioSourceMap; diff --git a/plugins/websocketsourceplugin/websocketsource.cpp b/plugins/websocketsourceplugin/websocketsource.cpp index faec6456..f8273ab9 100644 --- a/plugins/websocketsourceplugin/websocketsource.cpp +++ b/plugins/websocketsourceplugin/websocketsource.cpp @@ -25,6 +25,8 @@ #include <sstream> #include <json-glib/json-glib.h> #include <listplusplus.h> +#include <timestamp.h> + #include "debugout.h" #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1) libwebsocket_context *context; @@ -59,6 +61,7 @@ void WebSocketSource::checkSubscriptions() } activeRequests.push_back(prop); stringstream s; + ///TODO: fix transid here: s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << prop << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; string replystr = s.str(); @@ -159,14 +162,16 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket JsonParser* parser = json_parser_new(); if (!json_parser_load_from_data(parser,(char*)in,len,&error)) { - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n"; + DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON"<<endl; + DebugOut(0) << (char*)in <<endl; + DebugOut(0) <<error->message<<endl; return 0; } JsonNode* node = json_parser_get_root(parser); if(node == nullptr) { - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n"; + DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json"<<endl; //throw std::runtime_error("Unable to get JSON root object"); return 0; } @@ -174,13 +179,13 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket JsonReader* reader = json_reader_new(node); if(reader == nullptr) { - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n"; + DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!"<<endl; //throw std::runtime_error("Unable to create JSON reader"); return 0; } - + DebugOut(5)<<"source received: "<<string((char*)in)<<endl; string type; @@ -194,27 +199,70 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket json_reader_end_member(reader); list<string> data; - json_reader_read_member(reader,"data"); - if (json_reader_is_array(reader)) + list<pair<string,string> > pairdata; + if (name == "get") { - for(int i=0; i < json_reader_count_elements(reader); i++) + json_reader_read_member(reader,"data"); + if (json_reader_is_array(reader)) { - json_reader_read_element(reader,i); - string path = json_reader_get_string_value(reader); - data.push_back(path); - json_reader_end_element(reader); + for(int i=0; i < json_reader_count_elements(reader); i++) + { + + pair<string,string> pair; + json_reader_read_element(reader,i); + + json_reader_read_member(reader,"property"); + pair.first = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + json_reader_read_member(reader,"value"); + pair.second = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + json_reader_end_element(reader); + + pairdata.push_back(pair); + } } + else + { + pair<string,string> pair; + + json_reader_read_member(reader,"property"); + pair.first = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + json_reader_read_member(reader,"value"); + pair.second = json_reader_get_string_value(reader); + json_reader_end_member(reader); + + pairdata.push_back(pair); + } + json_reader_end_member(reader); } else { - string path = json_reader_get_string_value(reader); - if (path != "") + json_reader_read_member(reader,"data"); + if (json_reader_is_array(reader)) { - data.push_back(path); + for(int i=0; i < json_reader_count_elements(reader); i++) + { + json_reader_read_element(reader,i); + string path = json_reader_get_string_value(reader); + data.push_back(path); + json_reader_end_element(reader); + } } + else + { + string path = json_reader_get_string_value(reader); + if (path != "") + { + data.push_back(path); + } + } + json_reader_end_member(reader); } - json_reader_end_member(reader); - string id; json_reader_read_member(reader,"transactionid"); if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0) @@ -231,6 +279,32 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket } json_reader_end_member(reader); + double timestamp=amb::currentTime(); + json_reader_read_member(reader,"timestamp"); + if(const GError* err = json_reader_get_error(reader)) + { + DebugOut(0)<<"JSON Parsing error: no timestamp parameter: "<<err->message<<endl; + //g_error_free(err); + } + else + { + timestamp = atof(json_reader_get_string_value(reader)); + } + json_reader_end_member(reader); + + uint32_t sequence=0; + json_reader_read_member(reader,"sequence"); + if(const GError* err = json_reader_get_error(reader)) + { + DebugOut(0)<<"JSON Parsing error: no sequence parameter: "<<err->message<<endl; + //g_error_free(err); + } + else + { + sequence = atof(json_reader_get_string_value(reader)); + } + json_reader_end_member(reader); + ///TODO: this will probably explode: //mlc: I agree with Kevron here, it does explode. //if(error) g_error_free(error); @@ -249,7 +323,14 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket try { AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,data.front()); - m_re->updateProperty(name, type); + type->timestamp = timestamp; + type->sequence = sequence; + m_re->updateProperty(name, type, source->uuid()); + + double currenttime = amb::currentTime(); + + DebugOut(2)<<"websocket source latency: "<<(currenttime - timestamp)*1000<<"ms"<<endl; + delete type; } catch (exception ex) @@ -270,7 +351,7 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket if (name == "getSupportedEventTypes") { //printf("Got supported events!\n"); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request\n"; + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl; PropertyList props; while (data.size() > 0) { @@ -282,6 +363,27 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket source->setSupported(props); //m_re->updateSupported(m_supportedProperties,PropertyList()); } + else if (name == "get") + { + + DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl; + while (pairdata.size() > 0) + { + pair<string,string> pair = pairdata.front(); + pairdata.pop_front(); + if (source->propertyReplyMap.find(pair.first) != source->propertyReplyMap.end()) + { + AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(source->propertyReplyMap[pair.first]->property,pair.second); + v->timestamp = timestamp; + source->propertyReplyMap[pair.first]->value = v; + source->propertyReplyMap[pair.first]->completed(source->propertyReplyMap[pair.first]); + source->propertyReplyMap.erase(pair.first); + delete v; + } + } + //data will contain a property/value map. + } + } break; } @@ -326,6 +428,12 @@ PropertyList WebSocketSource::supported() return m_supportedProperties; } +int WebSocketSource::supportedOperations() +{ + /// TODO: need to do this correctly based on what the host supports. + return Get | Set; +} + string WebSocketSource::uuid() { return "d293f670-f0b3-11e1-aff1-0800200c9a66"; @@ -355,6 +463,25 @@ void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property pro void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply) { ///TODO: fill in + //s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; + //m_re->getPropertyAsync(); + /*reply.value = 1; + reply->completed(reply); + reply->completed = [](AsyncPropertyReply* reply) { + DebugOut()<<"Velocity Async request completed: "<<reply->value->toString()<<endl; + delete reply; + };*/ + propertyReplyMap[reply->property] = reply; + stringstream s; + s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; + string replystr = s.str(); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl; + //printf("Reply: %s\n",replystr.c_str()); + char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; + new_response+=LWS_SEND_BUFFER_PRE_PADDING; + strcpy(new_response,replystr.c_str()); + libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT); + delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); } void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) @@ -365,7 +492,20 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request ) { ///TODO: fill in - return NULL; + AsyncPropertyReply* reply = new AsyncPropertyReply(request); + reply->success = true; + stringstream s; + s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; + string replystr = s.str(); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n"; + //printf("Reply: %s\n",replystr.c_str()); + char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; + new_response+=LWS_SEND_BUFFER_PRE_PADDING; + strcpy(new_response,replystr.c_str()); + libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT); + delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); + reply->completed(reply); + return reply; } extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config) diff --git a/plugins/websocketsourceplugin/websocketsource.h b/plugins/websocketsourceplugin/websocketsource.h index 7854c806..12950193 100644 --- a/plugins/websocketsourceplugin/websocketsource.h +++ b/plugins/websocketsourceplugin/websocketsource.h @@ -40,6 +40,9 @@ public: void subscribeToPropertyChanges(VehicleProperty::Property property); void unsubscribeToPropertyChanges(VehicleProperty::Property property); PropertyList supported(); + + int supportedOperations(); + libwebsocket *clientsocket; PropertyList queuedRequests; bool clientConnected; @@ -50,6 +53,7 @@ public: void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {} void supportedChanged(PropertyList) {} void setConfiguration(map<string, string> config); + map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap; private: PropertyList m_supportedProperties; diff --git a/plugins/wheel/wheelplugin.cpp b/plugins/wheel/wheelplugin.cpp index 2e809061..d8d87c10 100644 --- a/plugins/wheel/wheelplugin.cpp +++ b/plugins/wheel/wheelplugin.cpp @@ -33,7 +33,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA using namespace std; #include "debugout.h" - +#include "timestamp.h" #define JSNAMELEN 128 #define LG27 "G27 Racing Wheel" @@ -100,6 +100,7 @@ private: bool oldClutch; bool brake; bool oldBrake; + AbstractSource* mParent; }; @@ -166,6 +167,11 @@ PropertyList WheelSourcePlugin::supported() return props; } +int WheelSourcePlugin::supportedOperations() +{ + return Get | Set; +} + void WheelSourcePlugin::unsubscribeToPropertyChanges(VehicleProperty::Property property) { mRequests.erase(property); @@ -189,7 +195,7 @@ WheelPrivate::WheelPrivate(WheelSourcePlugin *parent, AbstractRoutingEngine *rou :re(route), gis(nullptr), axis(nullptr), button(nullptr), oilPSI(10), coolantTemp(100), turnSignal(TurnSignals::Off), throttle(0), machineGuns(false), currentGear(Transmission::Neutral), steeringAngle(0), -clutch(false), oldClutch(false), brake(false), oldBrake(false) + clutch(false), oldClutch(false), brake(false), oldBrake(false), mParent(parent) { unsigned char numAxes = 0; @@ -448,7 +454,8 @@ void WheelPrivate::changeMachineGuns(bool val) { this->machineGuns = val; VehicleProperty::MachineGunTurretStatusType temp(this->machineGuns); - this->re->updateProperty(VehicleProperty::MachineGunTurretStatus, &temp); + temp.timestamp = amb::currentTime(); + this->re->updateProperty(VehicleProperty::MachineGunTurretStatus, &temp, mParent->uuid()); } void WheelPrivate::changeTurnSignal(TurnSignals::TurnSignalType dir, bool val) @@ -462,7 +469,8 @@ void WheelPrivate::changeTurnSignal(TurnSignals::TurnSignalType dir, bool val) } this->turnSignal = tsVal; VehicleProperty::TurnSignalType temp(this->turnSignal); - this->re->updateProperty(VehicleProperty::TurnSignal, &temp); + temp.timestamp = amb::currentTime(); + this->re->updateProperty(VehicleProperty::TurnSignal, &temp, mParent->uuid()); } void WheelPrivate::changeGear(int gear) @@ -470,20 +478,24 @@ void WheelPrivate::changeGear(int gear) this->currentGear = (Transmission::TransmissionPositions)gear; VehicleProperty::TransmissionShiftPositionType tempTrans(this->currentGear); VehicleProperty::VehicleSpeedType tempSpeed(this->calcCarSpeed()); - this->re->updateProperty(VehicleProperty::TransmissionShiftPosition, &tempTrans); - this->re->updateProperty(VehicleProperty::VehicleSpeed, &tempSpeed); + tempTrans.timestamp = amb::currentTime(); + tempSpeed.timestamp = amb::currentTime(); + this->re->updateProperty(VehicleProperty::TransmissionShiftPosition, &tempTrans, mParent->uuid()); + this->re->updateProperty(VehicleProperty::VehicleSpeed, &tempSpeed, mParent->uuid()); } void WheelPrivate::changeOilPressure(bool increase) { VehicleProperty::EngineOilPressureType temp(increase ? ++this->oilPSI : --this->oilPSI); - this->re->updateProperty(VehicleProperty::EngineOilPressure, &temp); + temp.timestamp = amb::currentTime(); + this->re->updateProperty(VehicleProperty::EngineOilPressure, &temp, mParent->uuid()); } void WheelPrivate::changeCoolantTemp(bool increase) { - VehicleProperty::EngineCoolantTemperatureType temp(increase ? ++this->coolantTemp : --this->coolantTemp); - this->re->updateProperty(VehicleProperty::EngineCoolantTemperature, &temp); + VehicleProperty::EngineCoolantTemperatureType temp(increase ? ++this->coolantTemp : --this->coolantTemp); + temp.timestamp = amb::currentTime(); + this->re->updateProperty(VehicleProperty::EngineCoolantTemperature, &temp, mParent->uuid()); } @@ -491,7 +503,8 @@ void WheelPrivate::changeSteeringAngle(int val) { this->steeringAngle = (((double)val/(double)32767.0) + (double)1.0) * (double)180.0; VehicleProperty::SteeringWheelAngleType temp(this->steeringAngle); - this->re->updateProperty(VehicleProperty::SteeringWheelAngle, &temp); + temp.timestamp = amb::currentTime(); + this->re->updateProperty(VehicleProperty::SteeringWheelAngle, &temp, mParent->uuid()); } void WheelPrivate::changeClutch(int val) @@ -501,7 +514,8 @@ void WheelPrivate::changeClutch(int val) if (this->oldClutch != this->clutch) { VehicleProperty::ClutchStatusType temp(this->clutch); - this->re->updateProperty(VehicleProperty::ClutchStatus, &temp); + temp.timestamp = amb::currentTime(); + this->re->updateProperty(VehicleProperty::ClutchStatus, &temp, mParent->uuid()); } } @@ -513,9 +527,9 @@ void WheelPrivate::changeThrottle(int val) VehicleProperty::EngineSpeedType tempRpm(this->calcRPM()); VehicleProperty::VehicleSpeedType tempSpeed(this->calcCarSpeed()); - this->re->updateProperty(VehicleProperty::ThrottlePosition, &tempThrottle); - this->re->updateProperty(VehicleProperty::EngineSpeed, &tempRpm); - this->re->updateProperty(VehicleProperty::VehicleSpeed, &tempSpeed); + this->re->updateProperty(VehicleProperty::ThrottlePosition, &tempThrottle, mParent->uuid()); + this->re->updateProperty(VehicleProperty::EngineSpeed, &tempRpm, mParent->uuid()); + this->re->updateProperty(VehicleProperty::VehicleSpeed, &tempSpeed, mParent->uuid()); } void WheelPrivate::changeBrake(int val) @@ -525,7 +539,7 @@ void WheelPrivate::changeBrake(int val) if (this->oldBrake != this->brake) { VehicleProperty::WheelBrakeType temp(this->brake); - this->re->updateProperty(VehicleProperty::WheelBrake, &temp); + this->re->updateProperty(VehicleProperty::WheelBrake, &temp, mParent->uuid()); } } @@ -550,22 +564,22 @@ void WheelPrivate::checkButtonEvents() if (this->button[11]) { // cout << "Inside button 11!" << endl; VehicleProperty::ButtonEventType tempButton(ButtonEvents::Preset1Button); - this->re->updateProperty(VehicleProperty::ButtonEvent, &tempButton); + this->re->updateProperty(VehicleProperty::ButtonEvent, &tempButton, mParent->uuid()); } if (this->button[8]) { // cout << "Inside button 8!" << endl; VehicleProperty::ButtonEventType tempButton(ButtonEvents::Preset2Button); - this->re->updateProperty(VehicleProperty::ButtonEvent, &tempButton); + this->re->updateProperty(VehicleProperty::ButtonEvent, &tempButton, mParent->uuid()); } if (this->button[9]) { // cout << "Inside button 9!" << endl; VehicleProperty::ButtonEventType tempButton(ButtonEvents::Preset3Button); - this->re->updateProperty(VehicleProperty::ButtonEvent, &tempButton); + this->re->updateProperty(VehicleProperty::ButtonEvent, &tempButton, mParent->uuid()); } if (this->button[10]) { // cout << "Inside button 10!" << endl; VehicleProperty::ButtonEventType tempButton(ButtonEvents::Preset4Button); - this->re->updateProperty(VehicleProperty::ButtonEvent, &tempButton); + this->re->updateProperty(VehicleProperty::ButtonEvent, &tempButton, mParent->uuid()); } } } diff --git a/plugins/wheel/wheelplugin.h b/plugins/wheel/wheelplugin.h index ab7e662c..30e7b870 100644 --- a/plugins/wheel/wheelplugin.h +++ b/plugins/wheel/wheelplugin.h @@ -40,6 +40,8 @@ public: void subscribeToPropertyChanges(VehicleProperty::Property property); void unsubscribeToPropertyChanges(VehicleProperty::Property property); PropertyList supported(); + + int supportedOperations(); void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {} void supportedChanged(PropertyList) {} |