summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Feuer <mfeuer@jaguarlandrover.com>2015-02-12 15:28:20 -0800
committerMagnus Feuer <mfeuer@jaguarlandrover.com>2015-02-12 15:28:20 -0800
commit0dfe4c6ebb2e0ab11e7301254a8edfda821d54e8 (patch)
treef56162dabe94094aeccef3849d5ff583561923d4
parentb1dbd5f7ea2bdd3f4e55bdf6bb681ea4df589096 (diff)
downloadrvi_core-0dfe4c6ebb2e0ab11e7301254a8edfda821d54e8.tar.gz
Removed big_data_demo since it has been moved to rvi_bigdata repo
-rw-r--r--big_data_demo/README1
-rw-r--r--big_data_demo/README.md107
-rw-r--r--big_data_demo/amb_dbus.py189
-rw-r--r--big_data_demo/big_data.service18
-rw-r--r--big_data_demo/big_data_device.py251
-rw-r--r--big_data_demo/big_data_server.py153
-rw-r--r--big_data_demo/data_logger.py242
-rw-r--r--big_data_demo/gps_collector.py100
-rw-r--r--big_data_demo/rvi_json_rpc_server.py36
-rw-r--r--packaging/big_data_demo.manifest5
-rwxr-xr-xpackaging/big_data_demo.spec56
11 files changed, 0 insertions, 1158 deletions
diff --git a/big_data_demo/README b/big_data_demo/README
deleted file mode 100644
index 6af629c..0000000
--- a/big_data_demo/README
+++ /dev/null
@@ -1 +0,0 @@
-Big data demo moved to its own repo in https://github.com/PDXostc/RVI_bigdata
diff --git a/big_data_demo/README.md b/big_data_demo/README.md
deleted file mode 100644
index 87f2fc3..0000000
--- a/big_data_demo/README.md
+++ /dev/null
@@ -1,107 +0,0 @@
-# BIG DATA DEMO #
-
-
-# RVI COMMANDS #
-
-
-## SUBSCRIBE ##
-
-Subscribe commands are sent from the backend server to a vehicle in order
-to setup a regular reporting of specific data from vehicle to server.
-
-Reporting will be done through the ```report``` command.
-
-Multiple subscribe commands can be sent, where all indicated channels are
-reported
-
- {
- "jsonrpc": "2.0",
- "id": 1,
- "method": "message",
- "params": {
- "service": "jlr.com/vin/123456/logging/subscribe",
- "channels":, ["location", "odometer", "speed"],
- "reporting_interval": 5000
- }
- }
-
-### PARAMETERS ###
-+ channels<br>
-Specifies the channels that we want reported back to the server.
-
-+ reporting_interval<br>
-Specifies the number of milliseconds between each data sample that is
-to be sent back end server. If the reporting interval is a negative
-integer, the channel's value will be reported at the given (absolute
-value) interval, or when the value changes, whatever happens first.
-
-
-## UNSUBSCRIBE ##
-
-Unubscribe commands are sent from the backend server to a vehicle in
-order to stop reporting of one or more data channels previously setup
-through a ```subscribe``` command.
-
- {
- "jsonrpc": "2.0",
- "id": 2,
- "method": "message",
- "params": {
- "service": "jlr.com/vin/123456/logging/unsubscribe",
- "channels":, ["location", "odometer", "speed"]
- }
- }
-
-### PARAMETERS ###
-+ channels<br>
-Specifies the channels to stop reporting. If a channel has been
-specified several times by multiple subcribe commands, all
-subscriptions to the channel are removed.
-
-## REPORT ##
-
-Publish commands are sent from the device to the backend server to report
-a batch of values for channels previously subscribed to by a ```subscribe``` command.
-Multiple values for a single channel can be provided in a single report.
-
-Each channel is reported with its channel name, its value, and the UTC
-msec timestamp when the value was sample.
-
- {
- "jsonrpc": "2.0",
- "id": 3,
- "method": "message",
- "params": {
- "service": "jlr.com/backend/logging/report",
- "vin": "1234",
- "timestamp": 1415143459110,
- "data":, [
- { "channel": "odometer", "value": 10022 },
- { "channel": "odometer", "value": 10023 },
- { "channel": "speed", "value": 113 },
- { "channel": "location",
- "value": { "lat": 39.0319, "lon": 125.7538, "alt": 222.3 } }
- ]
- }
- }
-
-
-### PARAMETERS ###
-+ data<br>
-Contains an array of all reported data points.
-
-+ channel<br>
-Contains the name of the channel a data point is reporetd for. Matches
-the channel name in a previously issued ```subscribe``` command.
-
-+ value<br>
-Specifies the value of the given channel at the given time. The actual
-value can be a string, a double, or a JSON object, and is implicitly
-defined by the channel name.
-
-+ timestamp<br>
-Specifies the timestamp in millisecond UTC when the value was sampled
-from the vehicle.
-
-
-# STARTING THE DEVICE SIDE #
diff --git a/big_data_demo/amb_dbus.py b/big_data_demo/amb_dbus.py
deleted file mode 100644
index 3a4937c..0000000
--- a/big_data_demo/amb_dbus.py
+++ /dev/null
@@ -1,189 +0,0 @@
-#!/usr/bin/python
-
-#
-# Copyright (C) 2014, Jaguar Land Rover
-#
-# This program is licensed under the terms and conditions of the
-# Mozilla Public License, version 2.0. The full text of the
-# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
-#
-
-
-#
-# AMB Dbus monitor that reports to the provided logger object
-#
-import sys
-import time
-import threading
-import dbus
-import json
-
-class DBUSMonitor(threading.Thread):
- """
- Retrieves data from databus monitor and sends it to
- """
-
- def __init__(self, logger):
- threading.Thread.__init__(self)
- self.sysbus = dbus.SystemBus()
- self.mgr_broker = self.sysbus.get_object("org.automotive.message.broker", "/")
- self.mgr_if = dbus.Interface(self.mgr_broker, "org.automotive.Manager")
- self.logger = logger
-
- # Maps object name (VehicleSpeed) to the corresponding
- # DBUS object path
- self.name_to_path = {}
-
- # Reverse mapping
- self.path_to_name = {}
-
-
- # Maps object path to the corresponding sample interval (in
- # seconds)
- self.path_to_interval = {}
-
-
- # Stores a dictionary with element format (next_timeout_utc,
- # object_path_array).
- #
- # Dicitonary is sorted on next_timeout by schedule(), allowing
- # for the run() scheduler to quickly determine when the next
- # timeout occurrs. The dictionary will only have a given
- # object path stored once. A single timeout may store multiple
- # object paths that are to be sampled at that given time,
- # hence the array of object paths
- self.schedule = {}
-
- available_objects = self.mgr_broker.List("", dbus_interface='org.automotive.Manager')
-
- for name in available_objects:
- # Get object path for name
- obj_path = self.mgr_if.FindObject(name)[0]
- # obj_path = self.sysbus.get_object("org.automotive.message.broker", name)
- # print "add_object({}) -> {}".format(name, obj_path)
- self.name_to_path[name] = obj_path
- self.path_to_name[obj_path] = name
- self.path_to_interval[obj_path] = 0 # Not subscribed to
-
-
- def unschedule_sample(self, obj_path):
- # Iterate over all time slots in schedule
- for ts in self.schedule:
-
- # Each slot in schedule is an array of dbus object paths
- # to be sampled at the given time.
- print "unschedule({}): checking ({}, {})".format(obj_path, ts, self.schedule[ts])
-
- # Delete obj_path from the given timeslot in schedule.
- # May throw an exception on index(obj_path) if it does not
- # exist. In that case, we simply move on to the next time stamp
- # in the schedule
- try:
- del self.schedule[ts][self.schedule[ts].index(obj_path)]
- except KeyError:
- #print "unschedule({}):1 Not found in ts {}".format(obj_path, ts)
- continue
-
- except ValueError:
- # print "unschedule({}):2 Not found in ts {}".format(obj_path, ts)
- continue
-
- # Deletion was successful
- print "unschedule({}): after delete: {}".format( obj_path, self.schedule[ts])
- return True
-
- # We got out of loop with nothing found
- return False
-
- #
- # Subscribe to a given object, sampling it at every given number
- # of seconds and send up the result to the logger provided to the
- # constructor.
- #
- def schedule_sample(self, obj_path, timestamp):
- # print "schedule_sample({}, {})".format(obj_path, timestamp)
-
- # Delete any old entries we have for the given object path
- self.unschedule_sample(obj_path)
-
- # Check if we have already have a slot for the given timestamp.
- # If so, append our object path to the existing array
- # of objects triggered at the given time.
- # If not, initialize a new time slot with our object path.
- if timestamp in self.schedule:
- self.schedule[timestamp].append(obj_path)
- else:
- self.schedule[timestamp] = [obj_path]
- # Re-sort schedule
- sorted(self.schedule)
-
- def subscribe(self, name, sample_interval):
- try:
- obj_path = self.name_to_path[name]
- self.path_to_interval[obj_path] = sample_interval
-
- except KeyError:
- print "subscribe_object({}): Not found".format(name)
- return False
-
- print "amb_dbus:subscribe({}, {}): called".format(name, sample_interval)
- # Schedule the object to be sampled sample_interval seconds from now
- self.schedule_sample(obj_path, int(time.time()) + sample_interval)
-
-
- def sample_and_report(self, obj_path):
- obj_name = self.path_to_name[obj_path]
- prop_if = dbus.Interface(self.sysbus.get_object("org.automotive.message.broker", obj_path),
- "org.freedesktop.DBus.Properties")
-
- # Ugly conversion from dbus types to json back to native python types.
- tmp = eval(json.dumps(prop_if.GetAll("org.automotive."+obj_name)))
- res = {}
- print tmp
- for entry in tmp:
- print entry
- val = tmp[entry]
- # For some reason dbus.Double survives ths conversion above.
- if type(val) is dbus.Double:
- val = float(val)
-
- # Should really be recursive into dictionaries and arrays,
- res[entry] = val
-
- print "dumping:", res
- self.logger.add_sample([(obj_name, res),])
-
- def run(self):
- while True:
- #
- # Stupid way of doing it.
- # We should sleep either until the first element
- # in the schedule queue is due, or
- # we get a wakeup signal from schedule()
- #
- time.sleep(1.0)
-
- # Retrieve a sorted list of all timestamps in self.schedule
- # FIXME: Some sort of ordered dictionary would probably be
- # smart here.
- sorted_ts = sorted(self.schedule)
- ct = int(time.time())
-
- # Go through all timestamps that are due for execution
- while len(sorted_ts) > 0 and sorted_ts[0] <= ct:
- # Process the first element of the sorted list
- obj_path_arr = self.schedule[sorted_ts[0]]
-
- # Delete the time slot from schedule
- del self.schedule[sorted_ts[0]]
-
- # Delete the time slot from sorted time slots
- del sorted_ts[0]
-
- # Go through all retrieved object paths, sample and report them
- for obj_path in obj_path_arr:
- self.sample_and_report(obj_path)
- # Reschedule self to the next interval time slot
- self.schedule_sample(obj_path, int(time.time()) +self.path_to_interval[obj_path])
-
-
diff --git a/big_data_demo/big_data.service b/big_data_demo/big_data.service
deleted file mode 100644
index 3e85b3a..0000000
--- a/big_data_demo/big_data.service
+++ /dev/null
@@ -1,18 +0,0 @@
-# systemd(8) setup usde by Tizen and others.
-[Unit]
-Description=Software Over The Air Service
-Wants=network-online.target rvi.service
-
-[Service]
-Environment="HOME=/opt/big_data_demo-0.3.0"
-Environment="PYTHONPATH=/opt/big_data_demo-0.3.0/mod/lib/python"
-Environment="HOME=/opt/big_data_demo-0.3.0 PYTHONPATH=/opt/big_data_demo-0.3.0/mod/lib/python "
-WorkingDirectory=/opt/big_data_demo-0.3.0
-Type=simple
-StandardOutput=journal
-StandardError=journal
-ExecStart=/bin/sh -c "sleep 10;/usr/bin/python big_data_device.py http://localhost:8811"
-GuessMainPID=yes
-
-[Install]
-WantedBy=graphical.target multi-user.target
diff --git a/big_data_demo/big_data_device.py b/big_data_demo/big_data_device.py
deleted file mode 100644
index fcd6c24..0000000
--- a/big_data_demo/big_data_device.py
+++ /dev/null
@@ -1,251 +0,0 @@
-#!/usr/bin/python
-
-#
-# Copyright (C) 2014, Jaguar Land Rover
-#
-# This program is licensed under the terms and conditions of the
-# Mozilla Public License, version 2.0. The full text of the
-# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
-#
-
-#
-# GPS Data Collector
-# A sample device implementation that collects GPS position data from gpsd
-# and sends it to an RVI backend.
-#
-
-import sys
-import getopt
-import os
-import time
-from datetime import tzinfo, timedelta, datetime
-import threading
-import random
-from signal import *
-import json
-from rvi_json_rpc_server import RVIJSONRPCServer
-import jsonrpclib
-from urlparse import urlparse
-import amb_dbus
-import data_logger
-import gps_collector
-import traceback
-MY_NAME = "Big Data Demo"
-
-class RVICallbackServer(threading.Thread):
- """
- RPC server thread responding to incoming callbacks from the RVI framework
- """
-
- def __init__(self, amb, logger, service_edge, callback_url):
- threading.Thread.__init__(self)
- self.logger = logger
- self.service_edge = service_edge
- self.callback_url = callback_url
- url = urlparse(self.callback_url)
- self.localServer = RVIJSONRPCServer(addr=((url.hostname, url.port)), logRequests=False)
- self.register_services()
- self.amb = amb
-
- def subscribe(self, channels, reporting_interval):
- for channel in channels:
- self.amb.subscribe(channel, int(reporting_interval))
- self.logger.add_subscription(channel, int(reporting_interval))
-
- self.logger.dump_db()
- return {u'status': 0}
-
-
- def unsubscribe(self, channels):
- print "unsubscribe(): channels:", channels
- for channel in channels:
- self.logger.delete_subscription(channel)
-
- return {u'status': 0}
-
- def register_services(self):
- # register callback functions with RPC server
- self.localServer.register_function(self.subscribe, "/logging/subscribe" )
- self.localServer.register_function(self.unsubscribe, "/logging/unsubscribe" )
-
- # register services with RVI framework
- rvi_dead = True
- services = []
- while rvi_dead:
- try:
- res = self.service_edge.register_service(service = "/logging/subscribe",
- network_address = self.callback_url)
-
- services.append(res['service'])
- res = self.service_edge.register_service(service = "/logging/unsubscribe",
- network_address = self.callback_url)
- services.append(res['service'])
- rvi_dead = False
- except Exception, err:
- print "No RVI. Wait and retry..."
- time.sleep(2.0)
-
- print 'Service registration successful. Services: ', services
- # Retrieve the Vin number from the returned service name
- vin = services[0]
- vin = vin[0:len(vin)-18]
- vin = vin[vin.rindex('/')+1:]
- print "Retrieved VIN:", vin
- self.vin = vin
-
- def run(self):
- self.localServer.serve_forever()
-
- def shutdown(self):
- self.localServer.shutdown()
-
-
-
-class UTC(tzinfo):
- """UTC"""
- def utcoffset(self, dt):
- return timedelta(0)
-
- def tzname(self, dt):
- return "UTC"
-
- def dst(self, dt):
- return timedelta(0)
-
-class DataSender(threading.Thread):
- """
- Sends data from the database to RVI
- """
-
- def __init__(self, destination, vin, rvi_server, logger, send_interval):
- threading.Thread.__init__(self)
- self.destination = destination
- self.vin = vin
- self.rvi_server = rvi_server
- self.logger = logger
- self.send_interval = send_interval
- self.transaction_id = 1
-
- def shutdown(self):
- self._Thread__stop()
-
- def run(self):
-
- utc = UTC()
- while True:
- sample = logger.retrieve_next_sample()
-
- # If no samples are to be had, sleep on it and try again.
- if sample == False:
- print "No samples"
- time.sleep(self.send_interval)
- continue
-
- ( timestamp, datapoints ) = sample
- data_arg = []
- for dp in datapoints:
- (channel, value) = dp
- data_arg.append({ u'channel': channel, u'value': value })
-
- utc_ts = datetime.fromtimestamp(timestamp, utc).isoformat()
- param = [{
- u'vin': self.vin,
- u'timestamp': utc_ts,
- u'data': data_arg
- }]
-
- print "Sending: {}".format(param)
- rvi_server.message(calling_service = "/big_data",
- service_name = self.destination + "/logging/report",
- transaction_id = self.transaction_id,
- timeout = int(time.time())+60,
- parameters = param)
-
- # Wipe sample now that we have sent it
- logger.delete_sample(timestamp)
- self.transaction_id += 1
-
-
-
-def cleanup(*args):
- print "Caught signal:", args[0], "Shutting down..."
- if gps_collector:
- gps_collector.shutdown()
- if rvi_callback:
- rvi_callback.shutdown()
- if logger:
- logger.shutdown()
- if data_sender:
- data_sender.shutdown()
- sys.exit(0)
-
-
-def usage():
- print "Usage: %s RVI-URL" % sys.argv[0]
- sys.exit(255)
-
-
-if __name__ == "__main__":
- #
- # Setup a localhost URL, using a random port, that we will listen to
- # incoming JSON-RPC publish calls on, delivered by our RVI service
- # edge (specified by rvi_url).
- #
- service_host = 'localhost'
- service_port = random.randint(20001, 59999)
- service_url = 'http://'+service_host + ':' + str(service_port)
-
- #
- # Check that we have the correct arguments
- #
- if len(sys.argv) != 2:
- usage()
-
- # Grab the URL to use
- [ progname, rvi_url ] = sys.argv
-
- # Welcome message
- print "RVI Big Data Device"
- print "Outbound URL to RVI: ", rvi_url
- print "Inbound URL from RVI: ", service_url
-
-
- # Setip the logger.
- logger = data_logger.Logger()
- logger.start()
-
-
- # Setup outbound JSON-RPC connection to the RVI Service Edge
- rvi_server = jsonrpclib.Server(rvi_url)
- print "SERVER:", rvi_server
-
- # Setup AMB DBUS integartion
- amb = amb_dbus.DBUSMonitor(logger)
- amb.start()
-
- # Setup inbound JSON-RPC server
- rvi_callback = RVICallbackServer(amb, logger, rvi_server, service_url)
- rvi_callback.start()
-
- # Setup data sender
- data_sender = DataSender("jlr.com/backend", rvi_callback.vin, rvi_url, logger, 3)
- data_sender.start()
-
- # Retrieve (persistent) subscriptions from
- # the logger and prime the amb dbus
- # connection with it.
- for (channel, interval) in logger.get_subscriptions():
- amb.subscribe(channel, interval)
-
- # Start GPS data collection
- interval = 1
- gps_collector = gps_collector.GPSCollector(logger)
-
- # catch signals for proper shutdown
- for sig in (SIGABRT, SIGTERM, SIGINT):
- signal(sig, cleanup)
-
- # Let the main thread run the gps collector
- gps_collector.run()
-
- print "gps_collector.run() exited."
diff --git a/big_data_demo/big_data_server.py b/big_data_demo/big_data_server.py
deleted file mode 100644
index 90bfdba..0000000
--- a/big_data_demo/big_data_server.py
+++ /dev/null
@@ -1,153 +0,0 @@
-#!/usr/bin/python
-
-#
-# Copyright (C) 2014, Jaguar Land Rover
-#
-# This program is licensed under the terms and conditions of the
-# Mozilla Public License, version 2.0. The full text of the
-# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
-#
-
-#
-# Simple SOTA SERVER
-#
-import sys
-from rvi_json_rpc_server import RVIJSONRPCServer
-import jsonrpclib
-import random
-import time
-import threading
-import os
-import base64
-import struct
-import SocketServer
-from base64 import b64encode
-from hashlib import sha1
-from mimetools import Message
-from StringIO import StringIO
-import json
-import Queue
-
-transaction_id = 0
-package_queue = Queue.Queue()
-
-def usage():
- print "Usage:", sys.argv[0], "<rvi_url>"
- print " <rvi_url> URL of Service Edge on a local RVI node"
- print
- print "The RVI Service Edge URL can be found in"
- print "[backend,vehicle].config as"
- print "env -> rvi -> components -> service_edge -> url"
- print
- print "The Service Edge URL is also logged as a notice when the"
- print "RVI node is started."
- sys.exit(255)
-
-
-def report(vin, timestamp, data):
- print "Got report from", vin, " timestamp", timestamp
- print data
- return {u'status': 0}
-
-#
-# Setup a localhost URL, using a random port, that we will listen to
-# incoming JSON-RPC publish calls on, delivered by our RVI service
-# edge (specified by rvi_url).
-#
-emulator_service_host = 'localhost'
-emulator_service_port = random.randint(20001, 59999)
-emulator_service_url = 'http://'+emulator_service_host + ':' + str(emulator_service_port)
-
-#
-# Check that we have the correct arguments
-#
-if len(sys.argv) != 2:
- usage()
-
-# Grab the URL to use
-[ progname, rvi_url ] = sys.argv
-
-
-
-# Setup an outbound JSON-RPC connection to the RVI Service Edge.
-rvi_server = jsonrpclib.Server(rvi_url)
-
-emulator_service = RVIJSONRPCServer(addr=((emulator_service_host, emulator_service_port)),
- logRequests=False)
-
-
-#
-# Regsiter callbacks for incoming JSON-RPC calls delivered to
-# the SOTA server from the vehicle RVI node's Service Edge.
-#
-emulator_service.register_function(report, "/logging/report" )
-
-
-# Create a thread to handle incoming stuff so that we can do input
-# in order to get new values
-thr = threading.Thread(target=emulator_service.serve_forever)
-thr.start()
-
-# We may see traffic immediately from the RVI node when
-# we register. Let's sleep for a bit to allow the emulator service
-# thread to get up to speed.
-time.sleep(0.5)
-
-#
-# Register our HVAC emulator service with the vehicle RVI node's Service Edge.
-# We register both services using our own URL as a callback.
-#
-res = rvi_server.register_service(service = "/logging/report", network_address = emulator_service_url)
-full_report_name = res['service']
-
-
-print "Big Data Emulator."
-print "Vehicle RVI node URL: ", rvi_url
-print "Emulator URL: ", emulator_service_url
-print "Full report service name: ", full_report_name
-
-
-while True:
- transaction_id += 1
- line = raw_input('Enter <subscribe|unsubscribe> <vin> <channel> [subscribe-interval] or "q" for quit: ')
- if line == 'q':
- emulator_service.shutdown()
- sys.exit(0)
-
-
-
- # Read a line and split it into a key val pair
- lst = line.split(' ')
- if len(lst) == 3:
- [cmd, vin, channel] = line.split(' ')
- interval = 0
- elif len(lst) == 4:
- [cmd, vin, channel, interval] = line.split(' ')
- else:
- print "Nope", len(lst), lst
- continue
-
- dst = 'jlr.com/vin/'+vin+'/logging/'
-
- if cmd[0] == 's':
- command = 'subscribe'
- if interval == 0:
- print 'Please specify interval parameter for subscribe commands.'
- continue
-
- params = [{ u'channels': [ channel ], u'interval': interval}]
- else:
- if interval != 0:
- print 'Please do not specify interval parameter for unsubscribe commands.'
- continue
-
- command = 'unsubscribe'
- params = [{ u'channels': [ channel ]}]
-
- rvi_server.message(calling_service = "/big_data",
- service_name = dst + command,
- transaction_id = str(transaction_id),
- timeout = int(time.time())+60,
- parameters = params)
-
- print('{}({}) sent to {}'.format(command, channel, vin))
diff --git a/big_data_demo/data_logger.py b/big_data_demo/data_logger.py
deleted file mode 100644
index 53a469d..0000000
--- a/big_data_demo/data_logger.py
+++ /dev/null
@@ -1,242 +0,0 @@
-#!/usr/bin/python
-
-#
-# Copyright (C) 2014, Jaguar Land Rover
-#
-# This program is licensed under the terms and conditions of the
-# Mozilla Public License, version 2.0. The full text of the
-# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
-#
-
-#
-# A generic logger / reporter
-#
-import sqlite3
-import Queue
-import threading
-import time
-import dbus
-CMD_ADD_SUBSCRIPTION = 1
-CMD_DELETE_SUBSCRIPTION = 2
-CMD_ADD_SUBSCRIPTION = 3
-CMD_ADD_SAMPLE = 4
-CMD_RETRIEVE_NEXT_SAMPLE = 5
-CMD_DELETE_SAMPLE = 6
-CMD_DELETE_ALL_SAMPLES = 7
-CMD_DUMP_DATABASE = 8
-CMD_SHUTDOWN = 9
-
-class Logger(threading.Thread):
- def __init__(self, db_file = '/var/tmp/big_data_demo.sql'):
- threading.Thread.__init__(self)
- self.db_file = db_file
- self.queue = Queue.Queue()
- self.subscriptions_loaded = False
- self.subscriptions = {}
-
- # Sqlite commands can only be used from the same thread that
- # created the database connection to begin with.
- # Hence the stupid thread solution
- def run(self):
- self.dbc = sqlite3.connect(self.db_file)
-
- print "Starting logger at {}".format(self.db_file)
-
- # Create the table that stores log data and index it on its timestamps
- self.dbc.execute('''CREATE TABLE IF NOT EXISTS log (timestamp, channel, value)''')
- self.dbc.execute('''CREATE INDEX IF NOT EXISTS ts_index on log (timestamp ASC)''')
-
- # Create a table to store all our subscriptions so that they survive a
- # system restert.
- self.dbc.execute('''CREATE TABLE IF NOT EXISTS subscriptions (channel, interval)''')
-
- # Retrieve all our subscriptions so that they are easily accessible
- for subscription in self.dbc.execute('''SELECT channel, interval FROM subscriptions'''):
- (channel, interval) = subscription
- # Interval is the sample interval in sec.
- # 0 is when the UTC of when last sample was made.
- print "Adding subscription {}. Interval {}".format(channel, interval)
- self.subscriptions[channel] = ( interval, 0 )
-
- self.subscriptions_loaded = True
- while True:
- # Try to get a command sent from a member function
- # call invoked by another thread.
-
- elem = self.queue.get()
- ( command, arg ) = elem
-
-
- if command == CMD_ADD_SUBSCRIPTION:
- (channel, sample_interval) = arg
- self.__add_subscription(channel, sample_interval)
-
- elif command == CMD_DELETE_SUBSCRIPTION:
- self.__delete_subscription(arg)
-
- elif command == CMD_ADD_SAMPLE:
- self.__add_sample(arg)
-
- elif command == CMD_RETRIEVE_NEXT_SAMPLE:
- # Arg is a queue object to send back the result over
- self.__retrieve_next_sample(arg)
-
- elif command == CMD_DELETE_SAMPLE:
- # Arg is timestamp to delete
- self.__delete_sample(arg)
-
- elif command == CMD_DELETE_ALL_SAMPLES:
- self.__delete_all_sample()
-
- elif command == CMD_DUMP_DATABASE:
- self.__dump_db()
-
- elif command == CMD_SHUTDOWN:
- print "Logger:run(): Exiting thread"
- return True
-
- else:
- print "Logger.run(): Unknown command: {} ignored".format(command)
-
-
- def shutdown(self):
- self.queue.put((CMD_SHUTDOWN, True))
- self.join()
-
- def get_subscriptions(self):
- while self.subscriptions_loaded == False:
- sleep (0.1)
-
- res = []
- for channel in self.subscriptions:
- (interval, tmp) = self.subscriptions[channel]
- res.append((channel, interval))
-
- return res
-
-
- def add_subscription(self, channel, sample_interval):
- self.queue.put((CMD_ADD_SUBSCRIPTION, (channel, sample_interval)))
-
- def __add_subscription(self, channel, sample_interval):
- if channel in self.subscriptions:
- print "Called {} already in subscriptions. Ignored".format(channel)
- return False
-
- print "Adding {} to subscriptions. Interval {}".format(channel, sample_interval)
- # Setup a new channel in the dictionary
- self.subscriptions[channel] = (sample_interval, 0)
- try:
- self.dbc.execute('''INSERT INTO subscriptions VALUES (?, ?)''', (channel, sample_interval))
- self.dbc.commit()
- except sqlite3.Error as e:
- print "An error occurred:", e.args[0]
-
- print "3"
- return True
-
- def delete_subscription(self, channel):
- self.queue.put((CMD_DELETE_SUBSCRIPTION, channel))
-
- def __delete_subscription(self, channel):
- if not channel in self.subscriptions:
- print "unsubscribe(): Channel {} not in subscriptions. Ignored".format(channel)
- return False
-
- # Remove from subscriptions
- del self.subscriptions[channel]
- self.dbc.execute('''DELETE FROM subscriptions WHERE channel=?''', (channel,))
- return True
-
- def add_sample(self, values):
- self.queue.put((CMD_ADD_SAMPLE, values))
-
- def __add_sample(self, values):
- # If the channel is not among our subscriptions, then ignore.
- # [ind for ind, elem in enumerate(self.subscriptions) if v[0] == 53]
-
- # If it is not time for us to sample the given channel yet, then ignore
- c_time = int(time.time())
-
- print "add_sample({})".format(values)
- for (channel, value) in values:
- if not channel in self.subscriptions:
- # print "add_sample({}): Not subscribed to. Ignored".format(channel)
- continue
-
- ( sample_interval, last_sample_ts ) = self.subscriptions[channel]
-
- # Skip if we have previously received a sample and
- # the interval to the next sample has yet to elapse.
- if last_sample_ts > 0 and c_time < last_sample_ts + sample_interval:
- # print "add_sample({}): c_time < last_sample_ts={} + sample_interval={}. Skipped".format(c_time, last_sample_ts, sample_interval)
- continue
-
- print "add_sample({}): {}".format(channel, value)
- # Store the sample
- # Convert the value dictionary to a string.
- self.dbc.execute('''INSERT INTO log VALUES (?, ?, ?)''', (c_time, channel, str(value)))
- self.dbc.commit()
-
- # Update the last sample timestamp
- # print "Updating subscriptions[{}] with ({}, {})".format(channel, sample_interval, c_time)
- self.subscriptions[channel] = ( sample_interval, c_time)
-
- return True
-
- # Retrieve all samples for the oldest time stamp in the database
- # Return:
- # False - no samples
- # (timestamp, [ ( channel, value), ... ]) - Samples for the given timestamp
- #
- def retrieve_next_sample(self):
- q = Queue.Queue()
- self.queue.put((CMD_RETRIEVE_NEXT_SAMPLE, q))
- # Wait for a reply to come back and return whatever it was
- return q.get()
-
- def __retrieve_next_sample(self, queue):
- # Get the oldest timestamp that we have stored.
- (ts, ) = self.dbc.execute('''SELECT min(timestamp) FROM log''').fetchone()
-
- # If no timestamp, then we have no data in db.
- if ts == None:
- queue.put(False)
- return False
-
- res = []
- # Retrieve all rows with a matching timestamp[
- for row in self.dbc.execute('''SELECT channel, value FROM log where timestamp=?''', (ts,)):
- # Convert value from string back to dict
- res.append((row[0], eval(row[1])))
-
- queue.put((ts, res))
- return True
-
-
- def delete_sample(self, timestamp):
- self.queue.put((CMD_DELETE_SAMPLE, timestamp))
-
- # Delete samples older than the given time stamp.
- def __delete_sample(self, timestamp):
- self.dbc.execute('''DELETE FROM log WHERE timestamp <= ?''', (timestamp,))
-
- def delete_all_samples(self):
- self.queue.put((CMD_DELETE_SAMPLE, True))
-
- # Delete allsamples with the given timestamp.
- def __delete_all_samples(self):
- self.dbc.execute('''DELETE FROM log''')
-
- def dump_db(self):
- self.queue.put((CMD_DUMP_DATABASE, True))
-
- def __dump_db(self):
- print "LOG dump:"
- for row in self.dbc.execute('''SELECT timestamp, channel, value FROM log'''):
- print row
- print "---"
- print "Subscription dump:"
- for row in self.dbc.execute('''SELECT * FROM subscriptions'''):
- print row
- print "---"
diff --git a/big_data_demo/gps_collector.py b/big_data_demo/gps_collector.py
deleted file mode 100644
index 610d5e4..0000000
--- a/big_data_demo/gps_collector.py
+++ /dev/null
@@ -1,100 +0,0 @@
-#!/usr/bin/python
-
-#
-# Copyright (C) 2014, Jaguar Land Rover
-#
-# This program is licensed under the terms and conditions of the
-# Mozilla Public License, version 2.0. The full text of the
-# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
-#
-
-#
-# GPS Data Collector
-# A sample device implementation that collects GPS position data from gpsd
-# and sends it to a logger object
-#
-
-import threading
-from gps import *
-import time
-
-class GPSPoller(threading.Thread):
- """
- Polls GPS devices via gpsd
- """
-
- def __init__(self):
- threading.Thread.__init__(self)
- self.session = gps(mode=WATCH_ENABLE)
-
- def shutdown(self):
- self._Thread__stop()
-
- def run(self):
- while True:
- self.session.next()
-
-class GPSCollector:
- """
- Collect GPS data
- """
-
- def __init__(self, logger):
- self.last_speed = 1.0
- self.gps_poller = GPSPoller()
- self.logger = logger
-
-
- def run(self):
- # start GPS polling thread
- self.gps_poller.start()
-
- # main execution loop
- while True:
- try:
- time.sleep(1)
-
- # process GPS data
- session = self.gps_poller.session
- if session.fix.mode == MODE_NO_FIX:
- print "Waiting for GPS to fix..."
- continue
-
- if isnan(session.fix.time):
- print "Invalid location:", session
- continue
-
-# if (session.fix.speed < 0.1) and (self.last_speed < 0.1):
-# print "Waiting for speed..."
-# continue
-
- self.last_speed = session.fix.speed
-
- print "Got data tc", session.utc
-
- self.logger.add_sample([('location', {
- u'lat': session.fix.latitude,
- u'lon': session.fix.longitude,
- u'alt': session.fix.altitude
- }), ('speed', session.fix.speed)])
-
- # time = session.utc
- # location.loc_latitude = session.fix.latitude
- # location.loc_longitude = session.fix.longitude
- # location.loc_altitude = session.fix.altitude
- # location.loc_speed = session.fix.speed
- # location.loc_climb = session.fix.climb
- # location.loc_track = session.fix.track
-
- # if the time is valid the data record is valid
-
-
- except KeyboardInterrupt:
- print ('\n')
- break
-
-
- def shutdown(self):
- if self.gps_poller:
- self.gps_poller.shutdown()
-
diff --git a/big_data_demo/rvi_json_rpc_server.py b/big_data_demo/rvi_json_rpc_server.py
deleted file mode 100644
index ec2932c..0000000
--- a/big_data_demo/rvi_json_rpc_server.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Copyright (C) 2014, Jaguar Land Rover
-#
-# This program is licensed under the terms and conditions of the
-# Mozilla Public License, version 2.0. The full text of the
-# Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
-#
-from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
-import jsonrpclib
-
-class RVIJSONRPCServer(SimpleJSONRPCServer):
- # Check if method is 'message', if so dispatch on
- # name 'service_name' instead.
- def _dispatch(self, method, params):
- if method == 'message':
- # print "Will dispatch message to: " + params['service_name']
- dict_param = {}
- # Extract the 'parameters' element from the top level JSON-RPC
- # 'param'.
- # Convert 'parameters' from [{'vin': 1234}, {hello: 'world'}] to
- # a regular dictionary: {'vin': 1234, hello: 'world'}
-
- print "Parameters:", params['parameters']
- msg_params = params['parameters']
- for i in range(0, len(msg_params)):
- for j in range(0, len(msg_params[i].keys())):
- # print "params", msg_params[i].keys()[j], "=", msg_params[i].values()[j]
- dict_param[msg_params[i].keys()[j]] = msg_params[i].values()[j]
-
- # print "Parameter disctionary: ", dict_param
- # print
- # Ship the processed dispatch info upward.
- return SimpleJSONRPCServer._dispatch(self, params['service_name'], dict_param)
-
-
- return SimpleJSONRPCServer._dispatch(self,method, params)
diff --git a/packaging/big_data_demo.manifest b/packaging/big_data_demo.manifest
deleted file mode 100644
index 97e8c31..0000000
--- a/packaging/big_data_demo.manifest
+++ /dev/null
@@ -1,5 +0,0 @@
-<manifest>
- <request>
- <domain name="_"/>
- </request>
-</manifest>
diff --git a/packaging/big_data_demo.spec b/packaging/big_data_demo.spec
deleted file mode 100755
index feecbcc..0000000
--- a/packaging/big_data_demo.spec
+++ /dev/null
@@ -1,56 +0,0 @@
-Summary: Remote Vehicle Interaction - Big Data Demo packaging
-Name: big_data_demo
-Version: 0.3.0
-Release: 1
-Group: App Framework/Application Communication
-License: Mozilla Public License 2.0
-Source: http://content.linuxfoundation.org/auto/downloads/big_data_demo/big_data_demo-0.3.0.tgz
-
-BuildRequires: make
-BuildRequires: python
-BuildRequires: rpm
-# BuildRequires: git
-
-%description
-Big Data Demo running on top of RVI
-
-
-%prep
-%setup -c big_data_demo-$RPM_PACKAGE_VERSION
-
-%build
-
-%install
-# Install the code.
-
-rm -fr $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION
-mkdir -p $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION
-
-cp ./big_data_demo/rvi_json_rpc_server.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION
-cp ./big_data_demo/amb_dbus.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION
-cp ./big_data_demo/data_logger.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION
-cp ./big_data_demo/gps_collector.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION
-cp ./big_data_demo/big_data_device.py $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION
-cp -r ./big_data_demo/mod $RPM_BUILD_ROOT/opt/big_data_demo-$RPM_PACKAGE_VERSION
-
-# Setup systemd
-mkdir -p $RPM_BUILD_ROOT/usr/lib/systemd/system/
-mkdir -p $RPM_BUILD_ROOT/etc/systemd/system/multi-user.target.wants/
-install ./big_data_demo/big_data.service $RPM_BUILD_ROOT/usr/lib/systemd/system/big_data.service
-ln -fsr $RPM_BUILD_ROOT/usr/lib/systemd/system/big_data.service $RPM_BUILD_ROOT/etc/systemd/system/multi-user.target.wants/big_data.service
-###################
-
-%post
-/usr/bin/systemctl daemon-reload
-
-%postun
-
-%clean
-rm -rf $RPM_BUILD_ROOT
-
-%files
-%manifest packaging/big_data_demo.manifest
-%defattr(-,root,root)
-/usr/lib/systemd/system/big_data.service
-/etc/systemd/system/multi-user.target.wants/big_data.service
-/opt/big_data_demo-0.3.0