diff options
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r-- | python/qmf2/console.py | 170 |
1 files changed, 95 insertions, 75 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 84209e255c..e08abc007c 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -31,9 +31,9 @@ from threading import Condition from qpid.messaging import Connection, Message, Empty, SendError from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier, - MsgKey, QmfData, QmfAddress, - SchemaClass, SchemaClassId, SchemaEventClass, - SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent) + MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId, + SchemaEventClass, SchemaObjectClass, WorkItem, + SchemaMethod, QmfEvent, timedelta_to_secs) # global flag that indicates which thread (if any) is @@ -249,11 +249,9 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - # create query to agent using this objects ID - oid = self.get_object_id() - query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, - self.get_object_id()) + query = QmfQuery.create_id_object(self.get_object_id(), + self.get_schema_class_id()) obj_list = self._agent._console.do_query(self._agent, query, timeout=_timeout) if obj_list is None or len(obj_list) != 1: @@ -309,6 +307,10 @@ class QmfConsoleData(QmfData): _map = {self.KEY_OBJECT_ID:str(oid), SchemaMethod.KEY_NAME:name} + + sid = self.get_schema_class_id() + if sid: + _map[self.KEY_SCHEMA_ID] = sid.map_encode() if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args @@ -969,7 +971,7 @@ class Console(Thread): # to expire now = datetime.datetime.utcnow() if self._next_agent_expire > now: - timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds + timeout = timedelta_to_secs(self._next_agent_expire - now) try: logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) xxx = self._session.next_receiver(timeout = timeout) @@ -980,34 +982,20 @@ class Console(Thread): logging.debug("Shutting down Console thread") def get_objects(self, + _object_id=None, _schema_id=None, _pname=None, _cname=None, - _object_id=None, _agents=None, _timeout=None): """ - @todo - """ - if _object_id is not None: - # query by object id - query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id) - elif _schema_id is not None: - pred = [QmfQuery.EQ, QmfData.KEY_SCHEMA_ID, _schema_id.map_encode()] - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) - elif _pname is not None: - # query by package name (and maybe class name) - if _cname is not None: - pred = [QmfQuery.AND, [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, _pname]], - [QmfQuery.EQ, SchemaClassId.KEY_CLASS, - [QmfQuery.QUOTE, _cname]]] - else: - pred = [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, _pname]] - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) - else: - raise Exception("invalid arguments") + Retrieve objects by id or schema. + By object_id: must specify schema_id or pname & cname if object defined + by a schema. Undescribed objects: only object_id needed. + + By schema: must specify schema_id or pname & cname - all instances of + objects defined by that schema are returned. + """ if _agents is None: # use copy of current agent list self._lock.acquire() @@ -1021,12 +1009,12 @@ class Console(Thread): agent_list = _agents # @todo validate this list! - # @todo: fix when async do_query done - query all agents at once, then - # wait for replies, instead of per-agent querying.... - if _timeout is None: _timeout = self._reply_timeout + # @todo: fix when async do_query done - query all agents at once, then + # wait for replies, instead of per-agent querying.... + obj_list = [] expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) for agent in agent_list: @@ -1035,11 +1023,54 @@ class Console(Thread): now = datetime.datetime.utcnow() if now >= expired: break - timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds - reply = self.do_query(agent, query, timeout) - if reply: - obj_list = obj_list + reply + if _pname is None: + if _object_id: + query = QmfQuery.create_id_object(_object_id, + _schema_id) + else: + if _schema_id is not None: + t_params = {QmfData.KEY_SCHEMA_ID: _schema_id} + else: + t_params = None + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + t_params) + timeout = timedelta_to_secs(expired - now) + reply = self.do_query(agent, query, timeout) + if reply: + obj_list = obj_list + reply + else: + # looking up by package name (and maybe class name), need to + # find all schema_ids in that package, then lookup object by + # schema_id + if _cname is not None: + pred = [QmfQuery.AND, + [QmfQuery.EQ, + SchemaClassId.KEY_PACKAGE, + [QmfQuery.QUOTE, _pname]], + [QmfQuery.EQ, SchemaClassId.KEY_CLASS, + [QmfQuery.QUOTE, _cname]]] + else: + pred = [QmfQuery.EQ, + SchemaClassId.KEY_PACKAGE, + [QmfQuery.QUOTE, _pname]] + query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred) + timeout = timedelta_to_secs(expired - now) + sid_list = self.do_query(agent, query, timeout) + if sid_list: + for sid in sid_list: + now = datetime.datetime.utcnow() + if now >= expired: + break + if _object_id is not None: + query = QmfQuery.create_id_object(_object_id, sid) + else: + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) + timeout = timedelta_to_secs(expired - now) + reply = self.do_query(agent, query, timeout) + if reply: + obj_list = obj_list + reply if obj_list: return obj_list return None @@ -1103,55 +1134,44 @@ class Console(Thread): " '%s'" % msg) return - ignore = True - matched = False correlated = False - agent_query = self._agent_discovery_filter - if msg.correlation_id: correlated = self._req_correlation.is_valid(msg.correlation_id) - if direct and correlated: - ignore = False - elif agent_query: - matched = agent_query.evaluate(QmfData.create(values=ai_map)) - ignore = not matched + agent = None + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + # agent already known, just update timestamp + agent._announce_timestamp = datetime.datetime.utcnow() + finally: + self._lock.release() - if not ignore: - agent = None - self._lock.acquire() - try: - agent = self._agent_map.get(name) - finally: - self._lock.release() + if not agent: + # need to create and add a new agent? + matched = False + if self._agent_discovery_filter: + tmp = QmfData.create(values=ai_map) + matched = self._agent_discovery_filter.evaluate(tmp) - if not agent: - # need to create and add a new agent + if (correlated or matched): agent = self._create_agent(name) if not agent: return # failed to add agent - - # lock out expiration scanning code - self._lock.acquire() - try: - old_timestamp = agent._announce_timestamp agent._announce_timestamp = datetime.datetime.utcnow() - finally: - self._lock.release() - - if old_timestamp == None and matched: - logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) - wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) - self._work_q.put(wi) - self._work_q_put = True - - if correlated: - # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - self._req_correlation.put_data(msg.correlation_id, msg) - - + if matched: + # unsolicited, but newly discovered + logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) + wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) + self._work_q.put(wi) + self._work_q_put = True + + if correlated: + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) def _handle_data_ind_msg(self, msg, cmap, version, direct): """ |