summaryrefslogtreecommitdiff
path: root/plugins/database/databasesink.h
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/database/databasesink.h')
-rw-r--r--plugins/database/databasesink.h182
1 files changed, 182 insertions, 0 deletions
diff --git a/plugins/database/databasesink.h b/plugins/database/databasesink.h
new file mode 100644
index 00000000..3b7efdbf
--- /dev/null
+++ b/plugins/database/databasesink.h
@@ -0,0 +1,182 @@
+/*
+ Copyright (C) 2012 Intel Corporation
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+
+#ifndef DATABASESINK_H
+#define DATABASESINK_H
+
+#include "abstractsink.h"
+#include "abstractsource.h"
+#include "basedb.hpp"
+#include <asyncqueue.hpp>
+#include "listplusplus.h"
+#include "ambpluginimpl.h"
+
+#include <glib.h>
+
+#include <functional>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <unordered_map>
+
+const std::string DatabaseLogging = "DatabaseLogging";
+const std::string DatabasePlayback = "DatabasePlayback";
+const std::string DatabaseFile = "DatabaseFile";
+
+class DBObject {
+public:
+ DBObject(): zone(0), time(0), sequence(0), quit(false) {}
+ std::string key;
+ std::string value;
+ std::string source;
+ int32_t zone;
+ double time;
+ int32_t sequence;
+ std::string tripId;
+
+ bool quit;
+
+ bool operator == (const DBObject & other) const
+ {
+ return (key == other.key && source == other.source && zone == other.zone &&
+ value == other.value && sequence == other.sequence && time == other.time);
+ }
+
+ bool operator != (const DBObject & other)
+ {
+ return (*this == other) == false;
+ }
+};
+
+namespace amb
+{
+
+struct DBObjectCompare
+{
+ bool operator()(DBObject const & lhs, DBObject & rhs) const
+ {
+ if (lhs == rhs)
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+};
+
+}
+
+namespace std {
+ template <> struct hash<DBObject>
+ {
+ size_t operator()(const DBObject & x) const
+ {
+ return x.key.length() * x.value.length() + x.time;
+ }
+ };
+}
+
+class Shared
+{
+public:
+ Shared()
+ :queue(true, true)
+ {
+ db = new BaseDB;
+ }
+ ~Shared()
+ {
+ delete db;
+ }
+
+ BaseDB * db;
+ amb::Queue<DBObject, amb::DBObjectCompare> queue;
+ std::string tripId;
+};
+
+class PlaybackShared
+{
+public:
+ PlaybackShared(AbstractRoutingEngine* re, std::string u, uint playbackMult)
+ :routingEngine(re),uuid(u),playBackMultiplier(playbackMult),stop(false) {}
+ ~PlaybackShared()
+ {
+ for(auto itr = playbackQueue.begin(); itr != playbackQueue.end(); itr++)
+ {
+ DBObject obj = *itr;
+ }
+
+ playbackQueue.clear();
+ }
+
+ AbstractRoutingEngine* routingEngine;
+ std::list<DBObject> playbackQueue;
+ uint playBackMultiplier;
+ std::string uuid;
+ bool stop;
+};
+
+PROPERTYTYPEBASIC(DatabaseLogging, bool)
+PROPERTYTYPEBASIC(DatabasePlayback, bool)
+PROPERTYTYPE(DatabaseFile, DatabaseFileType, StringPropertyType, std::string)
+
+class DatabaseSink : public AmbPluginImpl
+{
+
+public:
+ DatabaseSink(AbstractRoutingEngine* engine, map<string, string> config, AbstractSource &parent);
+ ~DatabaseSink();
+ virtual void supportedChanged(const PropertyList & supportedProperties);
+ virtual void propertyChanged(AbstractPropertyType *value);
+ const std::string uuid() const;
+
+ void init();
+
+ ///source role:
+ virtual void getRangePropertyAsync(AsyncRangePropertyReply *reply);
+ virtual AsyncPropertyReply * setProperty(const AsyncSetPropertyRequest & request);
+ virtual void subscribeToPropertyChanges(VehicleProperty::Property property);
+ virtual void unsubscribeToPropertyChanges(VehicleProperty::Property property);
+ int supportedOperations() const { return AbstractSource::GetRanged | AbstractSource::Get | AbstractSource::Set;}
+
+private: //methods:
+
+ void parseConfig();
+ void stopDb();
+ void startDb();
+ void startPlayback();
+ void initDb();
+ void updateForNewDbFilename();
+
+private:
+ PropertyList mSubscriptions;
+ Shared *shared;
+ std::unique_ptr<std::thread> thread;
+ std::string tablename;
+ std::string tablecreate;
+ PropertyList propertiesToSubscribeTo;
+ PlaybackShared* playbackShared;
+ uint playbackMultiplier;
+ std::shared_ptr<AbstractPropertyType> playback;
+ std::shared_ptr<AbstractPropertyType> databaseName;
+ std::shared_ptr<AbstractPropertyType> databaseLogging;
+};
+
+#endif // DATABASESINK_H