summaryrefslogtreecommitdiff
path: root/python/qmf2/agent.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qmf2/agent.py')
-rw-r--r--python/qmf2/agent.py90
1 files changed, 70 insertions, 20 deletions
diff --git a/python/qmf2/agent.py b/python/qmf2/agent.py
index 7c090ad36b..fd63807dc3 100644
--- a/python/qmf2/agent.py
+++ b/python/qmf2/agent.py
@@ -26,7 +26,8 @@ from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
from common import (make_subject, parse_subject, OpCode, QmfQuery,
SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
+ SchemaClass, SchemaClassId, WorkItem, SchemaMethod,
+ timedelta_to_secs)
# global flag that indicates which thread (if any) is
# running the agent notifier callback
@@ -42,18 +43,22 @@ class _MethodCallHandle(object):
application. Given to the app in a WorkItem, provided to the agent when
method_response() is invoked.
"""
- def __init__(self, correlation_id, reply_to, meth_name, _oid=None):
+ def __init__(self, correlation_id, reply_to, meth_name, _oid=None,
+ _schema_id=None):
self.correlation_id = correlation_id
self.reply_to = reply_to
self.meth_name = meth_name
self.oid = _oid
+ self.schema_id = _schema_id
class MethodCallParams(object):
"""
"""
- def __init__(self, name, _oid=None, _in_args=None, _user_id=None):
+ def __init__(self, name, _oid=None, _schema_id=None, _in_args=None,
+ _user_id=None):
self._meth_name = name
self._oid = _oid
+ self._schema_id = _schema_id
self._in_args = _in_args
self._user_id = _user_id
@@ -63,6 +68,9 @@ class MethodCallParams(object):
def get_object_id(self):
return self._oid
+ def get_schema_id(self):
+ return self._schema_id
+
def get_args(self):
return self._in_args
@@ -100,7 +108,12 @@ class Agent(Thread):
self._packages = {}
self._schema_timestamp = long(0)
self._schema = {}
- self._agent_data = {}
+ # _described_data holds QmfData objects that are associated with schema
+ # it is index by schema_id, object_id
+ self._described_data = {}
+ # _undescribed_data holds unstructured QmfData objects - these objects
+ # have no schema. it is indexed by object_id only.
+ self._undescribed_data = {}
self._work_q = Queue.Queue()
self._work_q_put = False
@@ -247,7 +260,7 @@ class Agent(Thread):
# (self.name, str(msg)))
self._topic_sender.send(msg)
- def add_object(self, data ):
+ def add_object(self, data):
"""
Register an instance of a QmfAgentData object.
"""
@@ -256,20 +269,34 @@ class Agent(Thread):
if not isinstance(data, QmfAgentData):
raise TypeError("QmfAgentData instance expected")
- id_ = data.get_object_id()
- if not id_:
+ oid = data.get_object_id()
+ if not oid:
raise TypeError("No identifier assigned to QmfAgentData!")
+ sid = data.get_schema_class_id()
+
self._lock.acquire()
try:
- self._agent_data[id_] = data
+ if sid:
+ if sid not in self._described_data:
+ self._described_data[sid] = {oid: data}
+ else:
+ self._described_data[sid][oid] = data
+ else:
+ self._undescribed_data[oid] = data
finally:
self._lock.release()
- def get_object(self, id):
+ def get_object(self, oid, schema_id):
+ data = None
self._lock.acquire()
try:
- data = self._agent_data.get(id)
+ if schema_id:
+ data = self._described_data.get(schema_id)
+ if data:
+ data = data.get(oid)
+ else:
+ data = self._undescribed_data.get(oid)
finally:
self._lock.release()
return data
@@ -284,6 +311,8 @@ class Agent(Thread):
_map = {SchemaMethod.KEY_NAME:handle.meth_name}
if handle.oid is not None:
_map[QmfData.KEY_OBJECT_ID] = handle.oid
+ if handle.schema_id is not None:
+ _map[QmfData.KEY_SCHEMA_ID] = handle.schema_id.map_encode()
if _out_args is not None:
_map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
if _error is not None:
@@ -340,7 +369,7 @@ class Agent(Thread):
logging.debug("Agent Indication Sent")
next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
- timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds
+ timeout = timedelta_to_secs(next_heartbeat - now)
# print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
try:
self._session.next_receiver(timeout=timeout)
@@ -519,11 +548,14 @@ class Agent(Thread):
in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
oid = cmap.get(QmfData.KEY_OBJECT_ID)
+ schema_id = cmap.get(QmfData.KEY_SCHEMA_ID)
+ if schema_id:
+ schema_id = SchemaClassId.from_map(schema_id)
handle = _MethodCallHandle(msg.correlation_id,
msg.reply_to,
mname,
- oid)
- param = MethodCallParams( mname, oid, in_args, msg.user_id)
+ oid, schema_id)
+ param = MethodCallParams( mname, oid, schema_id, in_args, msg.user_id)
self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
self._work_q_put = True
@@ -594,12 +626,24 @@ class Agent(Thread):
"""
"""
data_objs = []
+ # extract optional schema_id from target params
+ sid = None
+ t_params = query.get_target_param()
+ if t_params:
+ sid = t_params.get(QmfData.KEY_SCHEMA_ID)
+
# if querying for a specific object, do a direct lookup
if query.get_selector() == QmfQuery.ID:
+ oid = query.get_id()
found = None
self._lock.acquire()
try:
- found = self._agent_data.get(query.get_id())
+ if sid:
+ found = self._described_data.get(sid)
+ if found:
+ found = found.get(oid)
+ else:
+ found = self._undescribed_data.get(oid)
finally:
self._lock.release()
if found:
@@ -610,12 +654,18 @@ class Agent(Thread):
else: # otherwise, evaluate all data
self._lock.acquire()
try:
- for oid,val in self._agent_data.iteritems():
- if query.evaluate(val):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(val.map_encode())
+ if sid:
+ db = self._described_data.get(sid)
+ else:
+ db = self._undescribed_data
+
+ if db:
+ for oid,val in db.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(val.map_encode())
finally:
self._lock.release()