diff options
author | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-02-12 15:28:20 -0800 |
---|---|---|
committer | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-02-12 15:28:20 -0800 |
commit | 0dfe4c6ebb2e0ab11e7301254a8edfda821d54e8 (patch) | |
tree | f56162dabe94094aeccef3849d5ff583561923d4 | |
parent | b1dbd5f7ea2bdd3f4e55bdf6bb681ea4df589096 (diff) | |
download | rvi_core-0dfe4c6ebb2e0ab11e7301254a8edfda821d54e8.tar.gz |
Removed big_data_demo since it has been moved to rvi_bigdata repo
-rw-r--r-- | big_data_demo/README | 1 | ||||
-rw-r--r-- | big_data_demo/README.md | 107 | ||||
-rw-r--r-- | big_data_demo/amb_dbus.py | 189 | ||||
-rw-r--r-- | big_data_demo/big_data.service | 18 | ||||
-rw-r--r-- | big_data_demo/big_data_device.py | 251 | ||||
-rw-r--r-- | big_data_demo/big_data_server.py | 153 | ||||
-rw-r--r-- | big_data_demo/data_logger.py | 242 | ||||
-rw-r--r-- | big_data_demo/gps_collector.py | 100 | ||||
-rw-r--r-- | big_data_demo/rvi_json_rpc_server.py | 36 | ||||
-rw-r--r-- | packaging/big_data_demo.manifest | 5 | ||||
-rwxr-xr-x | packaging/big_data_demo.spec | 56 |
11 files changed, 0 insertions, 1158 deletions
diff --git a/big_data_demo/README b/big_data_demo/README deleted file mode 100644 index 6af629c..0000000 --- a/big_data_demo/README +++ /dev/null @@ -1 +0,0 @@ -Big data demo moved to its own repo in https://github.com/PDXostc/RVI_bigdata diff --git a/big_data_demo/README.md b/big_data_demo/README.md deleted file mode 100644 index 87f2fc3..0000000 --- a/big_data_demo/README.md +++ /dev/null @@ -1,107 +0,0 @@ -# BIG DATA DEMO # - - -# RVI COMMANDS # - - -## SUBSCRIBE ## - -Subscribe commands are sent from the backend server to a vehicle in order -to setup a regular reporting of specific data from vehicle to server. - -Reporting will be done through the ```report``` command. - -Multiple subscribe commands can be sent, where all indicated channels are -reported - - { - "jsonrpc": "2.0", - "id": 1, - "method": "message", - "params": { - "service": "jlr.com/vin/123456/logging/subscribe", - "channels":, ["location", "odometer", "speed"], - "reporting_interval": 5000 - } - } - -### PARAMETERS ### -+ channels<br> -Specifies the channels that we want reported back to the server. - -+ reporting_interval<br> -Specifies the number of milliseconds between each data sample that is -to be sent back end server. If the reporting interval is a negative -integer, the channel's value will be reported at the given (absolute -value) interval, or when the value changes, whatever happens first. - - -## UNSUBSCRIBE ## - -Unubscribe commands are sent from the backend server to a vehicle in -order to stop reporting of one or more data channels previously setup -through a ```subscribe``` command. - - { - "jsonrpc": "2.0", - "id": 2, - "method": "message", - "params": { - "service": "jlr.com/vin/123456/logging/unsubscribe", - "channels":, ["location", "odometer", "speed"] - } - } - -### PARAMETERS ### -+ channels<br> -Specifies the channels to stop reporting. If a channel has been -specified several times by multiple subcribe commands, all -subscriptions to the channel are removed. - -## REPORT ## - -Publish commands are sent from the device to the backend server to report -a batch of values for channels previously subscribed to by a ```subscribe``` command. -Multiple values for a single channel can be provided in a single report. - -Each channel is reported with its channel name, its value, and the UTC -msec timestamp when the value was sample. - - { - "jsonrpc": "2.0", - "id": 3, - "method": "message", - "params": { - "service": "jlr.com/backend/logging/report", - "vin": "1234", - "timestamp": 1415143459110, - "data":, [ - { "channel": "odometer", "value": 10022 }, - { "channel": "odometer", "value": 10023 }, - { "channel": "speed", "value": 113 }, - { "channel": "location", - "value": { "lat": 39.0319, "lon": 125.7538, "alt": 222.3 } } - ] - } - } - - -### PARAMETERS ### -+ data<br> -Contains an array of all reported data points. - -+ channel<br> -Contains the name of the channel a data point is reporetd for. Matches -the channel name in a previously issued ```subscribe``` command. - -+ value<br> -Specifies the value of the given channel at the given time. The actual -value can be a string, a double, or a JSON object, and is implicitly -defined by the channel name. - -+ timestamp<br> -Specifies the timestamp in millisecond UTC when the value was sampled -from the vehicle. - - -# STARTING THE DEVICE SIDE # diff --git a/big_data_demo/amb_dbus.py b/big_data_demo/amb_dbus.py deleted file mode 100644 index 3a4937c..0000000 --- a/big_data_demo/amb_dbus.py +++ /dev/null @@ -1,189 +0,0 @@ -#!/usr/bin/python - -# -# Copyright (C) 2014, Jaguar Land Rover -# -# This program is licensed under the terms and conditions of the -# Mozilla Public License, version 2.0. The full text of the -# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ -# - - -# -# AMB Dbus monitor that reports to the provided logger object -# -import sys -import time -import threading -import dbus -import json - -class DBUSMonitor(threading.Thread): - """ - Retrieves data from databus monitor and sends it to - """ - - def __init__(self, logger): - threading.Thread.__init__(self) - self.sysbus = dbus.SystemBus() - self.mgr_broker = self.sysbus.get_object("org.automotive.message.broker", "/") - self.mgr_if = dbus.Interface(self.mgr_broker, "org.automotive.Manager") - self.logger = logger - - # Maps object name (VehicleSpeed) to the corresponding - # DBUS object path - self.name_to_path = {} - - # Reverse mapping - self.path_to_name = {} - - - # Maps object path to the corresponding sample interval (in - # seconds) - self.path_to_interval = {} - - - # Stores a dictionary with element format (next_timeout_utc, - # object_path_array). - # - # Dicitonary is sorted on next_timeout by schedule(), allowing - # for the run() scheduler to quickly determine when the next - # timeout occurrs. The dictionary will only have a given - # object path stored once. A single timeout may store multiple - # object paths that are to be sampled at that given time, - # hence the array of object paths - self.schedule = {} - - available_objects = self.mgr_broker.List("", dbus_interface='org.automotive.Manager') - - for name in available_objects: - # Get object path for name - obj_path = self.mgr_if.FindObject(name)[0] - # obj_path = self.sysbus.get_object("org.automotive.message.broker", name) - # print "add_object({}) -> {}".format(name, obj_path) - self.name_to_path[name] = obj_path - self.path_to_name[obj_path] = name - self.path_to_interval[obj_path] = 0 # Not subscribed to - - - def unschedule_sample(self, obj_path): - # Iterate over all time slots in schedule - for ts in self.schedule: - - # Each slot in schedule is an array of dbus object paths - # to be sampled at the given time. - print "unschedule({}): checking ({}, {})".format(obj_path, ts, self.schedule[ts]) - - # Delete obj_path from the given timeslot in schedule. - # May throw an exception on index(obj_path) if it does not - # exist. In that case, we simply move on to the next time stamp - # in the schedule - try: - del self.schedule[ts][self.schedule[ts].index(obj_path)] - except KeyError: - #print "unschedule({}):1 Not found in ts {}".format(obj_path, ts) - continue - - except ValueError: - # print "unschedule({}):2 Not found in ts {}".format(obj_path, ts) - continue - - # Deletion was successful - print "unschedule({}): after delete: {}".format( obj_path, self.schedule[ts]) - return True - - # We got out of loop with nothing found - return False - - # - # Subscribe to a given object, sampling it at every given number - # of seconds and send up the result to the logger provided to the - # constructor. - # - def schedule_sample(self, obj_path, timestamp): - # print "schedule_sample({}, {})".format(obj_path, timestamp) - - # Delete any old entries we have for the given object path - self.unschedule_sample(obj_path) - - # Check if we have already have a slot for the given timestamp. - # If so, append our object path to the existing array - # of objects triggered at the given time. - # If not, initialize a new time slot with our object path. - if timestamp in self.schedule: - self.schedule[timestamp].append(obj_path) - else: - self.schedule[timestamp] = [obj_path] - # Re-sort schedule - sorted(self.schedule) - - def subscribe(self, name, sample_interval): - try: - obj_path = self.name_to_path[name] - self.path_to_interval[obj_path] = sample_interval - - except KeyError: - print "subscribe_object({}): Not found".format(name) - return False - - print "amb_dbus:subscribe({}, {}): called".format(name, sample_interval) - # Schedule the object to be sampled sample_interval seconds from now - self.schedule_sample(obj_path, int(time.time()) + sample_interval) - - - def sample_and_report(self, obj_path): - obj_name = self.path_to_name[obj_path] - prop_if = dbus.Interface(self.sysbus.get_object("org.automotive.message.broker", obj_path), - "org.freedesktop.DBus.Properties") - - # Ugly conversion from dbus types to json back to native python types. - tmp = eval(json.dumps(prop_if.GetAll("org.automotive."+obj_name))) - res = {} - print tmp - for entry in tmp: - print entry - val = tmp[entry] - # For some reason dbus.Double survives ths conversion above. - if type(val) is dbus.Double: - val = float(val) - - # Should really be recursive into dictionaries and arrays, - res[entry] = val - - print "dumping:", res - self.logger.add_sample([(obj_name, res),]) - - def run(self): - while True: - # - # Stupid way of doing it. - # We should sleep either until the first element - # in the schedule queue is due, or - # we get a wakeup signal from schedule() - # - time.sleep(1.0) - - # Retrieve a sorted list of all timestamps in self.schedule - # FIXME: Some sort of ordered dictionary would probably be - # smart here. - sorted_ts = sorted(self.schedule) - ct = int(time.time()) - - # Go through all timestamps that are due for execution - while len(sorted_ts) > 0 and sorted_ts[0] <= ct: - # Process the first element of the sorted list - obj_path_arr = self.schedule[sorted_ts[0]] - - # Delete the time slot from schedule - del self.schedule[sorted_ts[0]] - - # Delete the time slot from sorted time slots - del sorted_ts[0] - - # Go through all retrieved object paths, sample and report them - for obj_path in obj_path_arr: - self.sample_and_report(obj_path) - # Reschedule self to the next interval time slot - self.schedule_sample(obj_path, int(time.time()) +self.path_to_interval[obj_path]) - - diff --git a/big_data_demo/big_data.service b/big_data_demo/big_data.service deleted file mode 100644 index 3e85b3a..0000000 --- a/big_data_demo/big_data.service +++ /dev/null @@ -1,18 +0,0 @@ -# systemd(8) setup usde by Tizen and others. -[Unit] -Description=Software Over The Air Service -Wants=network-online.target rvi.service - -[Service] -Environment="HOME=/opt/big_data_demo-0.3.0" -Environment="PYTHONPATH=/opt/big_data_demo-0.3.0/mod/lib/python" -Environment="HOME=/opt/big_data_demo-0.3.0 PYTHONPATH=/opt/big_data_demo-0.3.0/mod/lib/python " -WorkingDirectory=/opt/big_data_demo-0.3.0 -Type=simple -StandardOutput=journal -StandardError=journal -ExecStart=/bin/sh -c "sleep 10;/usr/bin/python big_data_device.py http://localhost:8811" -GuessMainPID=yes - -[Install] -WantedBy=graphical.target multi-user.target diff --git a/big_data_demo/big_data_device.py b/big_data_demo/big_data_device.py deleted file mode 100644 index fcd6c24..0000000 --- a/big_data_demo/big_data_device.py +++ /dev/null @@ -1,251 +0,0 @@ -#!/usr/bin/python - -# -# Copyright (C) 2014, Jaguar Land Rover -# -# This program is licensed under the terms and conditions of the -# Mozilla Public License, version 2.0. The full text of the -# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ -# - -# -# GPS Data Collector -# A sample device implementation that collects GPS position data from gpsd -# and sends it to an RVI backend. -# - -import sys -import getopt -import os -import time -from datetime import tzinfo, timedelta, datetime -import threading -import random -from signal import * -import json -from rvi_json_rpc_server import RVIJSONRPCServer -import jsonrpclib -from urlparse import urlparse -import amb_dbus -import data_logger -import gps_collector -import traceback -MY_NAME = "Big Data Demo" - -class RVICallbackServer(threading.Thread): - """ - RPC server thread responding to incoming callbacks from the RVI framework - """ - - def __init__(self, amb, logger, service_edge, callback_url): - threading.Thread.__init__(self) - self.logger = logger - self.service_edge = service_edge - self.callback_url = callback_url - url = urlparse(self.callback_url) - self.localServer = RVIJSONRPCServer(addr=((url.hostname, url.port)), logRequests=False) - self.register_services() - self.amb = amb - - def subscribe(self, channels, reporting_interval): - for channel in channels: - self.amb.subscribe(channel, int(reporting_interval)) - self.logger.add_subscription(channel, int(reporting_interval)) - - self.logger.dump_db() - return {u'status': 0} - - - def unsubscribe(self, channels): - print "unsubscribe(): channels:", channels - for channel in channels: - self.logger.delete_subscription(channel) - - return {u'status': 0} - - def register_services(self): - # register callback functions with RPC server - self.localServer.register_function(self.subscribe, "/logging/subscribe" ) - self.localServer.register_function(self.unsubscribe, "/logging/unsubscribe" ) - - # register services with RVI framework - rvi_dead = True - services = [] - while rvi_dead: - try: - res = self.service_edge.register_service(service = "/logging/subscribe", - network_address = self.callback_url) - - services.append(res['service']) - res = self.service_edge.register_service(service = "/logging/unsubscribe", - network_address = self.callback_url) - services.append(res['service']) - rvi_dead = False - except Exception, err: - print "No RVI. Wait and retry..." - time.sleep(2.0) - - print 'Service registration successful. Services: ', services - # Retrieve the Vin number from the returned service name - vin = services[0] - vin = vin[0:len(vin)-18] - vin = vin[vin.rindex('/')+1:] - print "Retrieved VIN:", vin - self.vin = vin - - def run(self): - self.localServer.serve_forever() - - def shutdown(self): - self.localServer.shutdown() - - - -class UTC(tzinfo): - """UTC""" - def utcoffset(self, dt): - return timedelta(0) - - def tzname(self, dt): - return "UTC" - - def dst(self, dt): - return timedelta(0) - -class DataSender(threading.Thread): - """ - Sends data from the database to RVI - """ - - def __init__(self, destination, vin, rvi_server, logger, send_interval): - threading.Thread.__init__(self) - self.destination = destination - self.vin = vin - self.rvi_server = rvi_server - self.logger = logger - self.send_interval = send_interval - self.transaction_id = 1 - - def shutdown(self): - self._Thread__stop() - - def run(self): - - utc = UTC() - while True: - sample = logger.retrieve_next_sample() - - # If no samples are to be had, sleep on it and try again. - if sample == False: - print "No samples" - time.sleep(self.send_interval) - continue - - ( timestamp, datapoints ) = sample - data_arg = [] - for dp in datapoints: - (channel, value) = dp - data_arg.append({ u'channel': channel, u'value': value }) - - utc_ts = datetime.fromtimestamp(timestamp, utc).isoformat() - param = [{ - u'vin': self.vin, - u'timestamp': utc_ts, - u'data': data_arg - }] - - print "Sending: {}".format(param) - rvi_server.message(calling_service = "/big_data", - service_name = self.destination + "/logging/report", - transaction_id = self.transaction_id, - timeout = int(time.time())+60, - parameters = param) - - # Wipe sample now that we have sent it - logger.delete_sample(timestamp) - self.transaction_id += 1 - - - -def cleanup(*args): - print "Caught signal:", args[0], "Shutting down..." - if gps_collector: - gps_collector.shutdown() - if rvi_callback: - rvi_callback.shutdown() - if logger: - logger.shutdown() - if data_sender: - data_sender.shutdown() - sys.exit(0) - - -def usage(): - print "Usage: %s RVI-URL" % sys.argv[0] - sys.exit(255) - - -if __name__ == "__main__": - # - # Setup a localhost URL, using a random port, that we will listen to - # incoming JSON-RPC publish calls on, delivered by our RVI service - # edge (specified by rvi_url). - # - service_host = 'localhost' - service_port = random.randint(20001, 59999) - service_url = 'http://'+service_host + ':' + str(service_port) - - # - # Check that we have the correct arguments - # - if len(sys.argv) != 2: - usage() - - # Grab the URL to use - [ progname, rvi_url ] = sys.argv - - # Welcome message - print "RVI Big Data Device" - print "Outbound URL to RVI: ", rvi_url - print "Inbound URL from RVI: ", service_url - - - # Setip the logger. - logger = data_logger.Logger() - logger.start() - - - # Setup outbound JSON-RPC connection to the RVI Service Edge - rvi_server = jsonrpclib.Server(rvi_url) - print "SERVER:", rvi_server - - # Setup AMB DBUS integartion - amb = amb_dbus.DBUSMonitor(logger) - amb.start() - - # Setup inbound JSON-RPC server - rvi_callback = RVICallbackServer(amb, logger, rvi_server, service_url) - rvi_callback.start() - - # Setup data sender - data_sender = DataSender("jlr.com/backend", rvi_callback.vin, rvi_url, logger, 3) - data_sender.start() - - # Retrieve (persistent) subscriptions from - # the logger and prime the amb dbus - # connection with it. - for (channel, interval) in logger.get_subscriptions(): - amb.subscribe(channel, interval) - - # Start GPS data collection - interval = 1 - gps_collector = gps_collector.GPSCollector(logger) - - # catch signals for proper shutdown - for sig in (SIGABRT, SIGTERM, SIGINT): - signal(sig, cleanup) - - # Let the main thread run the gps collector - gps_collector.run() - - print "gps_collector.run() exited." diff --git a/big_data_demo/big_data_server.py b/big_data_demo/big_data_server.py deleted file mode 100644 index 90bfdba..0000000 --- a/big_data_demo/big_data_server.py +++ /dev/null @@ -1,153 +0,0 @@ -#!/usr/bin/python - -# -# Copyright (C) 2014, Jaguar Land Rover -# -# This program is licensed under the terms and conditions of the -# Mozilla Public License, version 2.0. The full text of the -# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ -# - -# -# Simple SOTA SERVER -# -import sys -from rvi_json_rpc_server import RVIJSONRPCServer -import jsonrpclib -import random -import time -import threading -import os -import base64 -import struct -import SocketServer -from base64 import b64encode -from hashlib import sha1 -from mimetools import Message -from StringIO import StringIO -import json -import Queue - -transaction_id = 0 -package_queue = Queue.Queue() - -def usage(): - print "Usage:", sys.argv[0], "<rvi_url>" - print " <rvi_url> URL of Service Edge on a local RVI node" - print - print "The RVI Service Edge URL can be found in" - print "[backend,vehicle].config as" - print "env -> rvi -> components -> service_edge -> url" - print - print "The Service Edge URL is also logged as a notice when the" - print "RVI node is started." - sys.exit(255) - - -def report(vin, timestamp, data): - print "Got report from", vin, " timestamp", timestamp - print data - return {u'status': 0} - -# -# Setup a localhost URL, using a random port, that we will listen to -# incoming JSON-RPC publish calls on, delivered by our RVI service -# edge (specified by rvi_url). -# -emulator_service_host = 'localhost' -emulator_service_port = random.randint(20001, 59999) -emulator_service_url = 'http://'+emulator_service_host + ':' + str(emulator_service_port) - -# -# Check that we have the correct arguments -# -if len(sys.argv) != 2: - usage() - -# Grab the URL to use -[ progname, rvi_url ] = sys.argv - - - -# Setup an outbound JSON-RPC connection to the RVI Service Edge. -rvi_server = jsonrpclib.Server(rvi_url) - -emulator_service = RVIJSONRPCServer(addr=((emulator_service_host, emulator_service_port)), - logRequests=False) - - -# -# Regsiter callbacks for incoming JSON-RPC calls delivered to -# the SOTA server from the vehicle RVI node's Service Edge. -# -emulator_service.register_function(report, "/logging/report" ) - - -# Create a thread to handle incoming stuff so that we can do input -# in order to get new values -thr = threading.Thread(target=emulator_service.serve_forever) -thr.start() - -# We may see traffic immediately from the RVI node when -# we register. Let's sleep for a bit to allow the emulator service -# thread to get up to speed. -time.sleep(0.5) - -# -# Register our HVAC emulator service with the vehicle RVI node's Service Edge. -# We register both services using our own URL as a callback. -# -res = rvi_server.register_service(service = "/logging/report", network_address = emulator_service_url) -full_report_name = res['service'] - - -print "Big Data Emulator." -print "Vehicle RVI node URL: ", rvi_url -print "Emulator URL: ", emulator_service_url -print "Full report service name: ", full_report_name - - -while True: - transaction_id += 1 - line = raw_input('Enter <subscribe|unsubscribe> <vin> <channel> [subscribe-interval] or "q" for quit: ') - if line == 'q': - emulator_service.shutdown() - sys.exit(0) - - - - # Read a line and split it into a key val pair - lst = line.split(' ') - if len(lst) == 3: - [cmd, vin, channel] = line.split(' ') - interval = 0 - elif len(lst) == 4: - [cmd, vin, channel, interval] = line.split(' ') - else: - print "Nope", len(lst), lst - continue - - dst = 'jlr.com/vin/'+vin+'/logging/' - - if cmd[0] == 's': - command = 'subscribe' - if interval == 0: - print 'Please specify interval parameter for subscribe commands.' - continue - - params = [{ u'channels': [ channel ], u'interval': interval}] - else: - if interval != 0: - print 'Please do not specify interval parameter for unsubscribe commands.' - continue - - command = 'unsubscribe' - params = [{ u'channels': [ channel ]}] - - rvi_server.message(calling_service = "/big_data", - service_name = dst + command, - transaction_id = str(transaction_id), - timeout = int(time.time())+60, - parameters = params) - - print('{}({}) sent to {}'.format(command, channel, vin)) diff --git a/big_data_demo/data_logger.py b/big_data_demo/data_logger.py deleted file mode 100644 index 53a469d..0000000 --- a/big_data_demo/data_logger.py +++ /dev/null @@ -1,242 +0,0 @@ -#!/usr/bin/python - -# -# Copyright (C) 2014, Jaguar Land Rover -# -# This program is licensed under the terms and conditions of the -# Mozilla Public License, version 2.0. The full text of the -# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ -# - -# -# A generic logger / reporter -# -import sqlite3 -import Queue -import threading -import time -import dbus -CMD_ADD_SUBSCRIPTION = 1 -CMD_DELETE_SUBSCRIPTION = 2 -CMD_ADD_SUBSCRIPTION = 3 -CMD_ADD_SAMPLE = 4 -CMD_RETRIEVE_NEXT_SAMPLE = 5 -CMD_DELETE_SAMPLE = 6 -CMD_DELETE_ALL_SAMPLES = 7 -CMD_DUMP_DATABASE = 8 -CMD_SHUTDOWN = 9 - -class Logger(threading.Thread): - def __init__(self, db_file = '/var/tmp/big_data_demo.sql'): - threading.Thread.__init__(self) - self.db_file = db_file - self.queue = Queue.Queue() - self.subscriptions_loaded = False - self.subscriptions = {} - - # Sqlite commands can only be used from the same thread that - # created the database connection to begin with. - # Hence the stupid thread solution - def run(self): - self.dbc = sqlite3.connect(self.db_file) - - print "Starting logger at {}".format(self.db_file) - - # Create the table that stores log data and index it on its timestamps - self.dbc.execute('''CREATE TABLE IF NOT EXISTS log (timestamp, channel, value)''') - self.dbc.execute('''CREATE INDEX IF NOT EXISTS ts_index on log (timestamp ASC)''') - - # Create a table to store all our subscriptions so that they survive a - # system restert. - self.dbc.execute('''CREATE TABLE IF NOT EXISTS subscriptions (channel, interval)''') - - # Retrieve all our subscriptions so that they are easily accessible - for subscription in self.dbc.execute('''SELECT channel, interval FROM subscriptions'''): - (channel, interval) = subscription - # Interval is the sample interval in sec. - # 0 is when the UTC of when last sample was made. - print "Adding subscription {}. Interval {}".format(channel, interval) - self.subscriptions[channel] = ( interval, 0 ) - - self.subscriptions_loaded = True - while True: - # Try to get a command sent from a member function - # call invoked by another thread. - - elem = self.queue.get() - ( command, arg ) = elem - - - if command == CMD_ADD_SUBSCRIPTION: - (channel, sample_interval) = arg - self.__add_subscription(channel, sample_interval) - - elif command == CMD_DELETE_SUBSCRIPTION: - self.__delete_subscription(arg) - - elif command == CMD_ADD_SAMPLE: - self.__add_sample(arg) - - elif command == CMD_RETRIEVE_NEXT_SAMPLE: - # Arg is a queue object to send back the result over - self.__retrieve_next_sample(arg) - - elif command == CMD_DELETE_SAMPLE: - # Arg is timestamp to delete - self.__delete_sample(arg) - - elif command == CMD_DELETE_ALL_SAMPLES: - self.__delete_all_sample() - - elif command == CMD_DUMP_DATABASE: - self.__dump_db() - - elif command == CMD_SHUTDOWN: - print "Logger:run(): Exiting thread" - return True - - else: - print "Logger.run(): Unknown command: {} ignored".format(command) - - - def shutdown(self): - self.queue.put((CMD_SHUTDOWN, True)) - self.join() - - def get_subscriptions(self): - while self.subscriptions_loaded == False: - sleep (0.1) - - res = [] - for channel in self.subscriptions: - (interval, tmp) = self.subscriptions[channel] - res.append((channel, interval)) - - return res - - - def add_subscription(self, channel, sample_interval): - self.queue.put((CMD_ADD_SUBSCRIPTION, (channel, sample_interval))) - - def __add_subscription(self, channel, sample_interval): - if channel in self.subscriptions: - print "Called {} already in subscriptions. Ignored".format(channel) - return False - - print "Adding {} to subscriptions. Interval {}".format(channel, sample_interval) - # Setup a new channel in the dictionary - self.subscriptions[channel] = (sample_interval, 0) - try: - self.dbc.execute('''INSERT INTO subscriptions VALUES (?, ?)''', (channel, sample_interval)) - self.dbc.commit() - except sqlite3.Error as e: - print "An error occurred:", e.args[0] - - print "3" - return True - - def delete_subscription(self, channel): - self.queue.put((CMD_DELETE_SUBSCRIPTION, channel)) - - def __delete_subscription(self, channel): - if not channel in self.subscriptions: - print "unsubscribe(): Channel {} not in subscriptions. Ignored".format(channel) - return False - - # Remove from subscriptions - del self.subscriptions[channel] - self.dbc.execute('''DELETE FROM subscriptions WHERE channel=?''', (channel,)) - return True - - def add_sample(self, values): - self.queue.put((CMD_ADD_SAMPLE, values)) - - def __add_sample(self, values): - # If the channel is not among our subscriptions, then ignore. - # [ind for ind, elem in enumerate(self.subscriptions) if v[0] == 53] - - # If it is not time for us to sample the given channel yet, then ignore - c_time = int(time.time()) - - print "add_sample({})".format(values) - for (channel, value) in values: - if not channel in self.subscriptions: - # print "add_sample({}): Not subscribed to. Ignored".format(channel) - continue - - ( sample_interval, last_sample_ts ) = self.subscriptions[channel] - - # Skip if we have previously received a sample and - # the interval to the next sample has yet to elapse. - if last_sample_ts > 0 and c_time < last_sample_ts + sample_interval: - # print "add_sample({}): c_time < last_sample_ts={} + sample_interval={}. Skipped".format(c_time, last_sample_ts, sample_interval) - continue - - print "add_sample({}): {}".format(channel, value) - # Store the sample - # Convert the value dictionary to a string. - self.dbc.execute('''INSERT INTO log VALUES (?, ?, ?)''', (c_time, channel, str(value))) - self.dbc.commit() - - # Update the last sample timestamp - # print "Updating subscriptions[{}] with ({}, {})".format(channel, sample_interval, c_time) - self.subscriptions[channel] = ( sample_interval, c_time) - - return True - - # Retrieve all samples for the oldest time stamp in the database - # Return: - # False - no samples - # (timestamp, [ ( channel, value), ... ]) - Samples for the given timestamp - # - def retrieve_next_sample(self): - q = Queue.Queue() - self.queue.put((CMD_RETRIEVE_NEXT_SAMPLE, q)) - # Wait for a reply to come back and return whatever it was - return q.get() - - def __retrieve_next_sample(self, queue): - # Get the oldest timestamp that we have stored. - (ts, ) = self.dbc.execute('''SELECT min(timestamp) FROM log''').fetchone() - - # If no timestamp, then we have no data in db. - if ts == None: - queue.put(False) - return False - - res = [] - # Retrieve all rows with a matching timestamp[ - for row in self.dbc.execute('''SELECT channel, value FROM log where timestamp=?''', (ts,)): - # Convert value from string back to dict - res.append((row[0], eval(row[1]))) - - queue.put((ts, res)) - return True - - - def delete_sample(self, timestamp): - self.queue.put((CMD_DELETE_SAMPLE, timestamp)) - - # Delete samples older than the given time stamp. - def __delete_sample(self, timestamp): - self.dbc.execute('''DELETE FROM log WHERE timestamp <= ?''', (timestamp,)) - - def delete_all_samples(self): - self.queue.put((CMD_DELETE_SAMPLE, True)) - - # Delete allsamples with the given timestamp. - def __delete_all_samples(self): - self.dbc.execute('''DELETE FROM log''') - - def dump_db(self): - self.queue.put((CMD_DUMP_DATABASE, True)) - - def __dump_db(self): - print "LOG dump:" - for row in self.dbc.execute('''SELECT timestamp, channel, value FROM log'''): - print row - print "---" - print "Subscription dump:" - for row in self.dbc.execute('''SELECT * FROM subscriptions'''): - print row - print "---" diff --git a/big_data_demo/gps_collector.py b/big_data_demo/gps_collector.py deleted file mode 100644 index 610d5e4..0000000 --- a/big_data_demo/gps_collector.py +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/python - -# -# Copyright (C) 2014, Jaguar Land Rover -# -# This program is licensed under the terms and conditions of the -# Mozilla Public License, version 2.0. The full text of the -# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ -# - -# -# GPS Data Collector -# A sample device implementation that collects GPS position data from gpsd -# and sends it to a logger object -# - -import threading -from gps import * -import time - -class GPSPoller(threading.Thread): - """ - Polls GPS devices via gpsd - """ - - def __init__(self): - threading.Thread.__init__(self) - self.session = gps(mode=WATCH_ENABLE) - - def shutdown(self): - self._Thread__stop() - - def run(self): - while True: - self.session.next() - -class GPSCollector: - """ - Collect GPS data - """ - - def __init__(self, logger): - self.last_speed = 1.0 - self.gps_poller = GPSPoller() - self.logger = logger - - - def run(self): - # start GPS polling thread - self.gps_poller.start() - - # main execution loop - while True: - try: - time.sleep(1) - - # process GPS data - session = self.gps_poller.session - if session.fix.mode == MODE_NO_FIX: - print "Waiting for GPS to fix..." - continue - - if isnan(session.fix.time): - print "Invalid location:", session - continue - -# if (session.fix.speed < 0.1) and (self.last_speed < 0.1): -# print "Waiting for speed..." -# continue - - self.last_speed = session.fix.speed - - print "Got data tc", session.utc - - self.logger.add_sample([('location', { - u'lat': session.fix.latitude, - u'lon': session.fix.longitude, - u'alt': session.fix.altitude - }), ('speed', session.fix.speed)]) - - # time = session.utc - # location.loc_latitude = session.fix.latitude - # location.loc_longitude = session.fix.longitude - # location.loc_altitude = session.fix.altitude - # location.loc_speed = session.fix.speed - # location.loc_climb = session.fix.climb - # location.loc_track = session.fix.track - - # if the time is valid the data record is valid - - - except KeyboardInterrupt: - print ('\n') - break - - - def shutdown(self): - if self.gps_poller: - self.gps_poller.shutdown() - diff --git a/big_data_demo/rvi_json_rpc_server.py b/big_data_demo/rvi_json_rpc_server.py deleted file mode 100644 index ec2932c..0000000 --- a/big_data_demo/rvi_json_rpc_server.py +++ /dev/null @@ -1,36 +0,0 @@ -# -# Copyright (C) 2014, Jaguar Land Rover -# -# This program is licensed under the terms and conditions of the -# Mozilla Public License, version 2.0. The full text of the -# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ -# -from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer -import jsonrpclib - -class RVIJSONRPCServer(SimpleJSONRPCServer): - # Check if method is 'message', if so dispatch on - # name 'service_name' instead. - def _dispatch(self, method, params): - if method == 'message': - # print "Will dispatch message to: " + params['service_name'] - dict_param = {} - # Extract the 'parameters' element from the top level JSON-RPC - # 'param'. - # Convert 'parameters' from [{'vin': 1234}, {hello: 'world'}] to - # a regular dictionary: {'vin': 1234, hello: 'world'} - - print "Parameters:", params['parameters'] - msg_params = params['parameters'] - for i in range(0, len(msg_params)): - for j in range(0, len(msg_params[i].keys())): - # print "params", msg_params[i].keys()[j], "=", msg_params[i].values()[j] - dict_param[msg_params[i].keys()[j]] = msg_params[i].values()[j] - - # print "Parameter disctionary: ", dict_param - # print - # Ship the processed dispatch info upward. - return SimpleJSONRPCServer._dispatch(self, params['service_name'], dict_param) - - - return SimpleJSONRPCServer._dispatch(self,method, params) diff --git a/packaging/big_data_demo.manifest b/packaging/big_data_demo.manifest deleted file mode 100644 index 97e8c31..0000000 --- a/packaging/big_data_demo.manifest +++ /dev/null @@ -1,5 +0,0 @@ -<manifest> - <request> - <domain name="_"/> - </request> -</manifest> diff --git a/packaging/big_data_demo.spec b/packaging/big_data_demo.spec deleted file mode 100755 index feecbcc..0000000 --- a/packaging/big_data_demo.spec +++ /dev/null @@ -1,56 +0,0 @@ -Summary: Remote Vehicle Interaction - Big Data Demo packaging -Name: big_data_demo -Version: 0.3.0 -Release: 1 -Group: App Framework/Application Communication -License: Mozilla Public License 2.0 -Source: http://content.linuxfoundation.org/auto/downloads/big_data_demo/big_data_demo-0.3.0.tgz - -BuildRequires: make -BuildRequires: python -BuildRequires: rpm -# BuildRequires: git - -%description -Big Data Demo running on top of RVI - - -%prep -%setup -c big_data_demo-$RPM_PACKAGE_VERSION - -%build - -%install -# Install the code. - -rm -fr $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION -mkdir -p $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION - -cp ./big_data_demo/rvi_json_rpc_server.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION -cp ./big_data_demo/amb_dbus.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION -cp ./big_data_demo/data_logger.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION -cp ./big_data_demo/gps_collector.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION -cp ./big_data_demo/big_data_device.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION -cp -r ./big_data_demo/mod $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION - -# Setup systemd -mkdir -p $RPM_BUILD_ROOT/usr/lib/systemd/system/ -mkdir -p $RPM_BUILD_ROOT/etc/systemd/system/multi-user.target.wants/ -install ./big_data_demo/big_data.service $RPM_BUILD_ROOT/usr/lib/systemd/system/big_data.service -ln -fsr $RPM_BUILD_ROOT/usr/lib/systemd/system/big_data.service $RPM_BUILD_ROOT/etc/systemd/system/multi-user.target.wants/big_data.service -################### - -%post -/usr/bin/systemctl daemon-reload - -%postun - -%clean -rm -rf $RPM_BUILD_ROOT - -%files -%manifest packaging/big_data_demo.manifest -%defattr(-,root,root) -/usr/lib/systemd/system/big_data.service -/etc/systemd/system/multi-user.target.wants/big_data.service -/opt/big_data_demo-0.3.0 |