summaryrefslogtreecommitdiff
path: root/python/qmf2/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r--python/qmf2/console.py170
1 files changed, 95 insertions, 75 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py
index 84209e255c..e08abc007c 100644
--- a/python/qmf2/console.py
+++ b/python/qmf2/console.py
@@ -31,9 +31,9 @@ from threading import Condition
from qpid.messaging import Connection, Message, Empty, SendError
from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
- MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, SchemaEventClass,
- SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
+ MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId,
+ SchemaEventClass, SchemaObjectClass, WorkItem,
+ SchemaMethod, QmfEvent, timedelta_to_secs)
# global flag that indicates which thread (if any) is
@@ -249,11 +249,9 @@ class QmfConsoleData(QmfData):
if _timeout is None:
_timeout = self._agent._console._reply_timeout
-
# create query to agent using this objects ID
- oid = self.get_object_id()
- query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
- self.get_object_id())
+ query = QmfQuery.create_id_object(self.get_object_id(),
+ self.get_schema_class_id())
obj_list = self._agent._console.do_query(self._agent, query,
timeout=_timeout)
if obj_list is None or len(obj_list) != 1:
@@ -309,6 +307,10 @@ class QmfConsoleData(QmfData):
_map = {self.KEY_OBJECT_ID:str(oid),
SchemaMethod.KEY_NAME:name}
+
+ sid = self.get_schema_class_id()
+ if sid:
+ _map[self.KEY_SCHEMA_ID] = sid.map_encode()
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args
@@ -969,7 +971,7 @@ class Console(Thread):
# to expire
now = datetime.datetime.utcnow()
if self._next_agent_expire > now:
- timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds
+ timeout = timedelta_to_secs(self._next_agent_expire - now)
try:
logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
xxx = self._session.next_receiver(timeout = timeout)
@@ -980,34 +982,20 @@ class Console(Thread):
logging.debug("Shutting down Console thread")
def get_objects(self,
+ _object_id=None,
_schema_id=None,
_pname=None, _cname=None,
- _object_id=None,
_agents=None,
_timeout=None):
"""
- @todo
- """
- if _object_id is not None:
- # query by object id
- query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id)
- elif _schema_id is not None:
- pred = [QmfQuery.EQ, QmfData.KEY_SCHEMA_ID, _schema_id.map_encode()]
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
- elif _pname is not None:
- # query by package name (and maybe class name)
- if _cname is not None:
- pred = [QmfQuery.AND, [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE,
- [QmfQuery.QUOTE, _pname]],
- [QmfQuery.EQ, SchemaClassId.KEY_CLASS,
- [QmfQuery.QUOTE, _cname]]]
- else:
- pred = [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE,
- [QmfQuery.QUOTE, _pname]]
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
- else:
- raise Exception("invalid arguments")
+ Retrieve objects by id or schema.
+ By object_id: must specify schema_id or pname & cname if object defined
+ by a schema. Undescribed objects: only object_id needed.
+
+ By schema: must specify schema_id or pname & cname - all instances of
+ objects defined by that schema are returned.
+ """
if _agents is None:
# use copy of current agent list
self._lock.acquire()
@@ -1021,12 +1009,12 @@ class Console(Thread):
agent_list = _agents
# @todo validate this list!
- # @todo: fix when async do_query done - query all agents at once, then
- # wait for replies, instead of per-agent querying....
-
if _timeout is None:
_timeout = self._reply_timeout
+ # @todo: fix when async do_query done - query all agents at once, then
+ # wait for replies, instead of per-agent querying....
+
obj_list = []
expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)
for agent in agent_list:
@@ -1035,11 +1023,54 @@ class Console(Thread):
now = datetime.datetime.utcnow()
if now >= expired:
break
- timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds
- reply = self.do_query(agent, query, timeout)
- if reply:
- obj_list = obj_list + reply
+ if _pname is None:
+ if _object_id:
+ query = QmfQuery.create_id_object(_object_id,
+ _schema_id)
+ else:
+ if _schema_id is not None:
+ t_params = {QmfData.KEY_SCHEMA_ID: _schema_id}
+ else:
+ t_params = None
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ t_params)
+ timeout = timedelta_to_secs(expired - now)
+ reply = self.do_query(agent, query, timeout)
+ if reply:
+ obj_list = obj_list + reply
+ else:
+ # looking up by package name (and maybe class name), need to
+ # find all schema_ids in that package, then lookup object by
+ # schema_id
+ if _cname is not None:
+ pred = [QmfQuery.AND,
+ [QmfQuery.EQ,
+ SchemaClassId.KEY_PACKAGE,
+ [QmfQuery.QUOTE, _pname]],
+ [QmfQuery.EQ, SchemaClassId.KEY_CLASS,
+ [QmfQuery.QUOTE, _cname]]]
+ else:
+ pred = [QmfQuery.EQ,
+ SchemaClassId.KEY_PACKAGE,
+ [QmfQuery.QUOTE, _pname]]
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred)
+ timeout = timedelta_to_secs(expired - now)
+ sid_list = self.do_query(agent, query, timeout)
+ if sid_list:
+ for sid in sid_list:
+ now = datetime.datetime.utcnow()
+ if now >= expired:
+ break
+ if _object_id is not None:
+ query = QmfQuery.create_id_object(_object_id, sid)
+ else:
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
+ timeout = timedelta_to_secs(expired - now)
+ reply = self.do_query(agent, query, timeout)
+ if reply:
+ obj_list = obj_list + reply
if obj_list:
return obj_list
return None
@@ -1103,55 +1134,44 @@ class Console(Thread):
" '%s'" % msg)
return
- ignore = True
- matched = False
correlated = False
- agent_query = self._agent_discovery_filter
-
if msg.correlation_id:
correlated = self._req_correlation.is_valid(msg.correlation_id)
- if direct and correlated:
- ignore = False
- elif agent_query:
- matched = agent_query.evaluate(QmfData.create(values=ai_map))
- ignore = not matched
+ agent = None
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(name)
+ if agent:
+ # agent already known, just update timestamp
+ agent._announce_timestamp = datetime.datetime.utcnow()
+ finally:
+ self._lock.release()
- if not ignore:
- agent = None
- self._lock.acquire()
- try:
- agent = self._agent_map.get(name)
- finally:
- self._lock.release()
+ if not agent:
+ # need to create and add a new agent?
+ matched = False
+ if self._agent_discovery_filter:
+ tmp = QmfData.create(values=ai_map)
+ matched = self._agent_discovery_filter.evaluate(tmp)
- if not agent:
- # need to create and add a new agent
+ if (correlated or matched):
agent = self._create_agent(name)
if not agent:
return # failed to add agent
-
- # lock out expiration scanning code
- self._lock.acquire()
- try:
- old_timestamp = agent._announce_timestamp
agent._announce_timestamp = datetime.datetime.utcnow()
- finally:
- self._lock.release()
-
- if old_timestamp == None and matched:
- logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
- wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
- self._work_q.put(wi)
- self._work_q_put = True
-
- if correlated:
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- self._req_correlation.put_data(msg.correlation_id, msg)
-
-
+ if matched:
+ # unsolicited, but newly discovered
+ logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
+ wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
+ self._work_q.put(wi)
+ self._work_q_put = True
+
+ if correlated:
+ # wake up all waiters
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
def _handle_data_ind_msg(self, msg, cmap, version, direct):
"""