summaryrefslogtreecommitdiff
path: root/python/qmf2/console.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-02-12 23:01:21 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-02-12 23:01:21 +0000
commit68008d8656a1dc3e96bedc40a93c9c2389c10f2c (patch)
tree274bdcdb223e0304217bb75b7d56c66a9ca7a99c /python/qmf2/console.py
parent68fb9a03641e50fbb6c045ac2f39091bf0bf9d08 (diff)
downloadqpid-python-68008d8656a1dc3e96bedc40a93c9c2389c10f2c.tar.gz
QPID-2261: add async method call workitems
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@909648 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r--python/qmf2/console.py167
1 files changed, 99 insertions, 68 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py
index 7b4bfe9fb8..c13cf70755 100644
--- a/python/qmf2/console.py
+++ b/python/qmf2/console.py
@@ -127,14 +127,11 @@ class _AsyncMailbox(_Mailbox):
A Mailbox for asynchronous delivery, with a timeout value.
"""
def __init__(self, console,
- agent_name,
_timeout=None):
"""
Invoked by application thread.
"""
super(_AsyncMailbox, self).__init__(console)
-
- self.agent_name = agent_name
self.console = console
if _timeout is None:
@@ -186,8 +183,8 @@ class _QueryMailbox(_AsyncMailbox):
Invoked by application thread.
"""
super(_QueryMailbox, self).__init__(console,
- agent_name,
_timeout)
+ self.agent_name = agent_name
self.target = target
self.msgkey = msgkey
self.context = context
@@ -267,19 +264,15 @@ class _SchemaPrefetchMailbox(_AsyncMailbox):
Handles responses to schema fetches made by the console.
"""
def __init__(self, console,
- agent_name,
schema_id,
_timeout=None):
"""
Invoked by application thread.
"""
super(_SchemaPrefetchMailbox, self).__init__(console,
- agent_name,
_timeout)
-
self.schema_id = schema_id
-
def deliver(self, reply):
"""
Process schema response messages.
@@ -306,6 +299,62 @@ class _SchemaPrefetchMailbox(_AsyncMailbox):
+class _MethodMailbox(_AsyncMailbox):
+ """
+ A mailbox used for asynchronous method requests.
+ """
+ def __init__(self, console,
+ context,
+ _timeout=None):
+ """
+ Invoked by application thread.
+ """
+ super(_MethodMailbox, self).__init__(console,
+ _timeout)
+ self.context = context
+
+ def deliver(self, reply):
+ """
+ Process method response messages delivered to this mailbox.
+ Invoked by Console Management thread only.
+ """
+
+ _map = reply.content.get(MsgKey.method)
+ if not _map:
+ logging.error("Invalid method call reply message")
+ result = None
+ else:
+ error=_map.get(SchemaMethod.KEY_ERROR)
+ if error:
+ error = QmfData.from_map(error)
+ result = MethodResult(_error=error)
+ else:
+ result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
+
+ # create workitem
+ wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result)
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ self.destroy()
+
+
+ def expire(self):
+ """
+ The mailbox expired without receiving a reply.
+ Invoked by the Console Management thread only.
+ """
+ logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
+ datetime.datetime.utcnow())
+ # send along an empty response
+ wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None)
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ self.destroy()
+
+
+
##==============================================================================
## DATA MODEL
##==============================================================================
@@ -374,7 +423,7 @@ class QmfConsoleData(QmfData):
query = QmfQuery.create_id_object(self.get_object_id(),
self.get_schema_class_id())
obj_list = self._agent._console.do_query(self._agent, query,
- timeout=_timeout)
+ _timeout=_timeout)
if obj_list is None or len(obj_list) != 1:
return None
@@ -385,7 +434,7 @@ class QmfConsoleData(QmfData):
def invoke_method(self, name, _in_args={}, _reply_handle=None,
_timeout=None):
"""
- invoke the named method.
+ Invoke the named method on this object.
"""
assert self._agent
assert self._agent._console
@@ -397,7 +446,11 @@ class QmfConsoleData(QmfData):
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- mbox = _SyncMailbox(self._agent._console)
+ if _reply_handle is not None:
+ mbox = _MethodMailbox(self._agent._console,
+ _reply_handle)
+ else:
+ mbox = _SyncMailbox(self._agent._console)
cid = mbox.get_address()
_map = {self.KEY_OBJECT_ID:str(oid),
@@ -417,9 +470,8 @@ class QmfConsoleData(QmfData):
mbox.destroy()
return None
- # @todo async method calls!!!
if _reply_handle is not None:
- print("ASYNC TBD")
+ return True
logging.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
@@ -561,21 +613,23 @@ class Agent(object):
def invoke_method(self, name, _in_args={}, _reply_handle=None,
_timeout=None):
"""
+ Invoke the named method on this agent.
"""
assert self._console
if _timeout is None:
_timeout = self._console._reply_timeout
- if _in_args:
- _in_args = _in_args.copy()
-
- mbox = _SyncMailbox(self._console)
+ if _reply_handle is not None:
+ mbox = _MethodMailbox(self._console,
+ _reply_handle)
+ else:
+ mbox = _SyncMailbox(self._console)
cid = mbox.get_address()
_map = {SchemaMethod.KEY_NAME:name}
if _in_args:
- _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+ _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy()
logging.debug("Sending method req to Agent (%s)" % time.time())
try:
@@ -585,9 +639,8 @@ class Agent(object):
mbox.destroy()
return None
- # @todo async method calls!!!
if _reply_handle is not None:
- print("ASYNC TBD")
+ return True
logging.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
@@ -939,7 +992,7 @@ class Console(Thread):
return agent
- def do_query(self, agent, query, timeout=None ):
+ def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
"""
"""
query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
@@ -954,7 +1007,15 @@ class Console(Thread):
if not msgkey:
raise Exception("Invalid target for query: %s" % str(query))
- mbox = _SyncMailbox(self)
+ if _reply_handle is not None:
+ mbox = _QueryMailbox(self,
+ agent.get_name(),
+ _reply_handle,
+ target, msgkey,
+ _timeout)
+ else:
+ mbox = _SyncMailbox(self)
+
cid = mbox.get_address()
try:
@@ -965,17 +1026,21 @@ class Console(Thread):
mbox.destroy()
return None
- if not timeout:
- timeout = self._reply_timeout
+ # return now if async reply expected
+ if _reply_handle is not None:
+ return True
+
+ if not _timeout:
+ _timeout = self._reply_timeout
- logging.debug("Waiting for response to Query (%s)" % timeout)
+ logging.debug("Waiting for response to Query (%s)" % _timeout)
now = datetime.datetime.utcnow()
- expire = now + datetime.timedelta(seconds=timeout)
+ expire = now + datetime.timedelta(seconds=_timeout)
response = []
while (expire > now):
- timeout = timedelta_to_secs(expire - now)
- reply = mbox.fetch(timeout)
+ _timeout = timedelta_to_secs(expire - now)
+ reply = mbox.fetch(_timeout)
if not reply:
logging.debug("Query wait timed-out.")
break
@@ -1021,39 +1086,6 @@ class Console(Thread):
mbox.destroy()
return response
-
- def do_async_query(self, agent, query, app_handle, _timeout=None ):
- """
- """
- query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
- QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
- QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
- QmfQuery.TARGET_SCHEMA: MsgKey.schema,
- QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
- QmfQuery.TARGET_AGENT: MsgKey.agent_info}
-
- target = query.get_target()
- msgkey = query_keymap.get(target)
- if not msgkey:
- raise Exception("Invalid target for query: %s" % str(query))
-
- mbox = _QueryMailbox(self,
- agent.get_name(),
- app_handle,
- target, msgkey,
- _timeout)
- cid = mbox.get_address()
-
- try:
- logging.debug("Sending Query to Agent (%s)" % time.time())
- agent._send_query(query, cid)
- except SendError, e:
- logging.error(str(e))
- mbox.destroy()
- return False
- return True
-
-
def _wake_thread(self):
"""
Make the console management thread loop wakeup from its next_receiver
@@ -1189,7 +1221,7 @@ class Console(Thread):
query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
t_params)
timeout = timedelta_to_secs(expired - now)
- reply = self.do_query(agent, query, timeout)
+ reply = self.do_query(agent, query, _timeout=timeout)
if reply:
obj_list = obj_list + reply
else:
@@ -1209,7 +1241,7 @@ class Console(Thread):
[QmfQuery.QUOTE, _pname]]
query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred)
timeout = timedelta_to_secs(expired - now)
- sid_list = self.do_query(agent, query, timeout)
+ sid_list = self.do_query(agent, query, _timeout=timeout)
if sid_list:
for sid in sid_list:
now = datetime.datetime.utcnow()
@@ -1221,7 +1253,7 @@ class Console(Thread):
t_params = {QmfData.KEY_SCHEMA_ID: sid}
query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
timeout = timedelta_to_secs(expired - now)
- reply = self.do_query(agent, query, timeout)
+ reply = self.do_query(agent, query, _timeout=timeout)
if reply:
obj_list = obj_list + reply
if obj_list:
@@ -1591,8 +1623,7 @@ class Console(Thread):
self._lock.release()
if need_fetch:
- mbox = _SchemaPrefetchMailbox(self, agent.get_name(),
- schema_id)
+ mbox = _SchemaPrefetchMailbox(self, schema_id)
query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
logging.debug("Sending Schema Query to Agent (%s)" % time.time())
try:
@@ -1628,8 +1659,8 @@ class Console(Thread):
# note: do_query will add the new schema to the cache automatically.
slist = self.do_query(_agent,
- QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
- _timeout)
+ QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
+ _timeout=_timeout)
if slist:
return slist[0]
else: