diff options
Diffstat (limited to 'python/qmf2/agent.py')
-rw-r--r-- | python/qmf2/agent.py | 90 |
1 files changed, 70 insertions, 20 deletions
diff --git a/python/qmf2/agent.py b/python/qmf2/agent.py index 7c090ad36b..fd63807dc3 100644 --- a/python/qmf2/agent.py +++ b/python/qmf2/agent.py @@ -26,7 +26,8 @@ from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 from common import (make_subject, parse_subject, OpCode, QmfQuery, SchemaObjectClass, MsgKey, QmfData, QmfAddress, - SchemaClass, SchemaClassId, WorkItem, SchemaMethod) + SchemaClass, SchemaClassId, WorkItem, SchemaMethod, + timedelta_to_secs) # global flag that indicates which thread (if any) is # running the agent notifier callback @@ -42,18 +43,22 @@ class _MethodCallHandle(object): application. Given to the app in a WorkItem, provided to the agent when method_response() is invoked. """ - def __init__(self, correlation_id, reply_to, meth_name, _oid=None): + def __init__(self, correlation_id, reply_to, meth_name, _oid=None, + _schema_id=None): self.correlation_id = correlation_id self.reply_to = reply_to self.meth_name = meth_name self.oid = _oid + self.schema_id = _schema_id class MethodCallParams(object): """ """ - def __init__(self, name, _oid=None, _in_args=None, _user_id=None): + def __init__(self, name, _oid=None, _schema_id=None, _in_args=None, + _user_id=None): self._meth_name = name self._oid = _oid + self._schema_id = _schema_id self._in_args = _in_args self._user_id = _user_id @@ -63,6 +68,9 @@ class MethodCallParams(object): def get_object_id(self): return self._oid + def get_schema_id(self): + return self._schema_id + def get_args(self): return self._in_args @@ -100,7 +108,12 @@ class Agent(Thread): self._packages = {} self._schema_timestamp = long(0) self._schema = {} - self._agent_data = {} + # _described_data holds QmfData objects that are associated with schema + # it is index by schema_id, object_id + self._described_data = {} + # _undescribed_data holds unstructured QmfData objects - these objects + # have no schema. it is indexed by object_id only. + self._undescribed_data = {} self._work_q = Queue.Queue() self._work_q_put = False @@ -247,7 +260,7 @@ class Agent(Thread): # (self.name, str(msg))) self._topic_sender.send(msg) - def add_object(self, data ): + def add_object(self, data): """ Register an instance of a QmfAgentData object. """ @@ -256,20 +269,34 @@ class Agent(Thread): if not isinstance(data, QmfAgentData): raise TypeError("QmfAgentData instance expected") - id_ = data.get_object_id() - if not id_: + oid = data.get_object_id() + if not oid: raise TypeError("No identifier assigned to QmfAgentData!") + sid = data.get_schema_class_id() + self._lock.acquire() try: - self._agent_data[id_] = data + if sid: + if sid not in self._described_data: + self._described_data[sid] = {oid: data} + else: + self._described_data[sid][oid] = data + else: + self._undescribed_data[oid] = data finally: self._lock.release() - def get_object(self, id): + def get_object(self, oid, schema_id): + data = None self._lock.acquire() try: - data = self._agent_data.get(id) + if schema_id: + data = self._described_data.get(schema_id) + if data: + data = data.get(oid) + else: + data = self._undescribed_data.get(oid) finally: self._lock.release() return data @@ -284,6 +311,8 @@ class Agent(Thread): _map = {SchemaMethod.KEY_NAME:handle.meth_name} if handle.oid is not None: _map[QmfData.KEY_OBJECT_ID] = handle.oid + if handle.schema_id is not None: + _map[QmfData.KEY_SCHEMA_ID] = handle.schema_id.map_encode() if _out_args is not None: _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy() if _error is not None: @@ -340,7 +369,7 @@ class Agent(Thread): logging.debug("Agent Indication Sent") next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) - timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds + timeout = timedelta_to_secs(next_heartbeat - now) # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) try: self._session.next_receiver(timeout=timeout) @@ -519,11 +548,14 @@ class Agent(Thread): in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) oid = cmap.get(QmfData.KEY_OBJECT_ID) + schema_id = cmap.get(QmfData.KEY_SCHEMA_ID) + if schema_id: + schema_id = SchemaClassId.from_map(schema_id) handle = _MethodCallHandle(msg.correlation_id, msg.reply_to, mname, - oid) - param = MethodCallParams( mname, oid, in_args, msg.user_id) + oid, schema_id) + param = MethodCallParams( mname, oid, schema_id, in_args, msg.user_id) self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) self._work_q_put = True @@ -594,12 +626,24 @@ class Agent(Thread): """ """ data_objs = [] + # extract optional schema_id from target params + sid = None + t_params = query.get_target_param() + if t_params: + sid = t_params.get(QmfData.KEY_SCHEMA_ID) + # if querying for a specific object, do a direct lookup if query.get_selector() == QmfQuery.ID: + oid = query.get_id() found = None self._lock.acquire() try: - found = self._agent_data.get(query.get_id()) + if sid: + found = self._described_data.get(sid) + if found: + found = found.get(oid) + else: + found = self._undescribed_data.get(oid) finally: self._lock.release() if found: @@ -610,12 +654,18 @@ class Agent(Thread): else: # otherwise, evaluate all data self._lock.acquire() try: - for oid,val in self._agent_data.iteritems(): - if query.evaluate(val): - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(val.map_encode()) + if sid: + db = self._described_data.get(sid) + else: + db = self._undescribed_data + + if db: + for oid,val in db.iteritems(): + if query.evaluate(val): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(val.map_encode()) finally: self._lock.release() |