diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-12 23:01:21 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-12 23:01:21 +0000 |
commit | 68008d8656a1dc3e96bedc40a93c9c2389c10f2c (patch) | |
tree | 274bdcdb223e0304217bb75b7d56c66a9ca7a99c /python/qmf2/console.py | |
parent | 68fb9a03641e50fbb6c045ac2f39091bf0bf9d08 (diff) | |
download | qpid-python-68008d8656a1dc3e96bedc40a93c9c2389c10f2c.tar.gz |
QPID-2261: add async method call workitems
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@909648 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r-- | python/qmf2/console.py | 167 |
1 files changed, 99 insertions, 68 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 7b4bfe9fb8..c13cf70755 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -127,14 +127,11 @@ class _AsyncMailbox(_Mailbox): A Mailbox for asynchronous delivery, with a timeout value. """ def __init__(self, console, - agent_name, _timeout=None): """ Invoked by application thread. """ super(_AsyncMailbox, self).__init__(console) - - self.agent_name = agent_name self.console = console if _timeout is None: @@ -186,8 +183,8 @@ class _QueryMailbox(_AsyncMailbox): Invoked by application thread. """ super(_QueryMailbox, self).__init__(console, - agent_name, _timeout) + self.agent_name = agent_name self.target = target self.msgkey = msgkey self.context = context @@ -267,19 +264,15 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): Handles responses to schema fetches made by the console. """ def __init__(self, console, - agent_name, schema_id, _timeout=None): """ Invoked by application thread. """ super(_SchemaPrefetchMailbox, self).__init__(console, - agent_name, _timeout) - self.schema_id = schema_id - def deliver(self, reply): """ Process schema response messages. @@ -306,6 +299,62 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): +class _MethodMailbox(_AsyncMailbox): + """ + A mailbox used for asynchronous method requests. + """ + def __init__(self, console, + context, + _timeout=None): + """ + Invoked by application thread. + """ + super(_MethodMailbox, self).__init__(console, + _timeout) + self.context = context + + def deliver(self, reply): + """ + Process method response messages delivered to this mailbox. + Invoked by Console Management thread only. + """ + + _map = reply.content.get(MsgKey.method) + if not _map: + logging.error("Invalid method call reply message") + result = None + else: + error=_map.get(SchemaMethod.KEY_ERROR) + if error: + error = QmfData.from_map(error) + result = MethodResult(_error=error) + else: + result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) + + # create workitem + wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + def expire(self): + """ + The mailbox expired without receiving a reply. + Invoked by the Console Management thread only. + """ + logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % + datetime.datetime.utcnow()) + # send along an empty response + wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + ##============================================================================== ## DATA MODEL ##============================================================================== @@ -374,7 +423,7 @@ class QmfConsoleData(QmfData): query = QmfQuery.create_id_object(self.get_object_id(), self.get_schema_class_id()) obj_list = self._agent._console.do_query(self._agent, query, - timeout=_timeout) + _timeout=_timeout) if obj_list is None or len(obj_list) != 1: return None @@ -385,7 +434,7 @@ class QmfConsoleData(QmfData): def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ - invoke the named method. + Invoke the named method on this object. """ assert self._agent assert self._agent._console @@ -397,7 +446,11 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - mbox = _SyncMailbox(self._agent._console) + if _reply_handle is not None: + mbox = _MethodMailbox(self._agent._console, + _reply_handle) + else: + mbox = _SyncMailbox(self._agent._console) cid = mbox.get_address() _map = {self.KEY_OBJECT_ID:str(oid), @@ -417,9 +470,8 @@ class QmfConsoleData(QmfData): mbox.destroy() return None - # @todo async method calls!!! if _reply_handle is not None: - print("ASYNC TBD") + return True logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) @@ -561,21 +613,23 @@ class Agent(object): def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ + Invoke the named method on this agent. """ assert self._console if _timeout is None: _timeout = self._console._reply_timeout - if _in_args: - _in_args = _in_args.copy() - - mbox = _SyncMailbox(self._console) + if _reply_handle is not None: + mbox = _MethodMailbox(self._console, + _reply_handle) + else: + mbox = _SyncMailbox(self._console) cid = mbox.get_address() _map = {SchemaMethod.KEY_NAME:name} if _in_args: - _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy() logging.debug("Sending method req to Agent (%s)" % time.time()) try: @@ -585,9 +639,8 @@ class Agent(object): mbox.destroy() return None - # @todo async method calls!!! if _reply_handle is not None: - print("ASYNC TBD") + return True logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) @@ -939,7 +992,7 @@ class Console(Thread): return agent - def do_query(self, agent, query, timeout=None ): + def do_query(self, agent, query, _reply_handle=None, _timeout=None ): """ """ query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, @@ -954,7 +1007,15 @@ class Console(Thread): if not msgkey: raise Exception("Invalid target for query: %s" % str(query)) - mbox = _SyncMailbox(self) + if _reply_handle is not None: + mbox = _QueryMailbox(self, + agent.get_name(), + _reply_handle, + target, msgkey, + _timeout) + else: + mbox = _SyncMailbox(self) + cid = mbox.get_address() try: @@ -965,17 +1026,21 @@ class Console(Thread): mbox.destroy() return None - if not timeout: - timeout = self._reply_timeout + # return now if async reply expected + if _reply_handle is not None: + return True + + if not _timeout: + _timeout = self._reply_timeout - logging.debug("Waiting for response to Query (%s)" % timeout) + logging.debug("Waiting for response to Query (%s)" % _timeout) now = datetime.datetime.utcnow() - expire = now + datetime.timedelta(seconds=timeout) + expire = now + datetime.timedelta(seconds=_timeout) response = [] while (expire > now): - timeout = timedelta_to_secs(expire - now) - reply = mbox.fetch(timeout) + _timeout = timedelta_to_secs(expire - now) + reply = mbox.fetch(_timeout) if not reply: logging.debug("Query wait timed-out.") break @@ -1021,39 +1086,6 @@ class Console(Thread): mbox.destroy() return response - - def do_async_query(self, agent, query, app_handle, _timeout=None ): - """ - """ - query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, - QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, - QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, - QmfQuery.TARGET_SCHEMA: MsgKey.schema, - QmfQuery.TARGET_OBJECT: MsgKey.data_obj, - QmfQuery.TARGET_AGENT: MsgKey.agent_info} - - target = query.get_target() - msgkey = query_keymap.get(target) - if not msgkey: - raise Exception("Invalid target for query: %s" % str(query)) - - mbox = _QueryMailbox(self, - agent.get_name(), - app_handle, - target, msgkey, - _timeout) - cid = mbox.get_address() - - try: - logging.debug("Sending Query to Agent (%s)" % time.time()) - agent._send_query(query, cid) - except SendError, e: - logging.error(str(e)) - mbox.destroy() - return False - return True - - def _wake_thread(self): """ Make the console management thread loop wakeup from its next_receiver @@ -1189,7 +1221,7 @@ class Console(Thread): query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) timeout = timedelta_to_secs(expired - now) - reply = self.do_query(agent, query, timeout) + reply = self.do_query(agent, query, _timeout=timeout) if reply: obj_list = obj_list + reply else: @@ -1209,7 +1241,7 @@ class Console(Thread): [QmfQuery.QUOTE, _pname]] query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred) timeout = timedelta_to_secs(expired - now) - sid_list = self.do_query(agent, query, timeout) + sid_list = self.do_query(agent, query, _timeout=timeout) if sid_list: for sid in sid_list: now = datetime.datetime.utcnow() @@ -1221,7 +1253,7 @@ class Console(Thread): t_params = {QmfData.KEY_SCHEMA_ID: sid} query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) timeout = timedelta_to_secs(expired - now) - reply = self.do_query(agent, query, timeout) + reply = self.do_query(agent, query, _timeout=timeout) if reply: obj_list = obj_list + reply if obj_list: @@ -1591,8 +1623,7 @@ class Console(Thread): self._lock.release() if need_fetch: - mbox = _SchemaPrefetchMailbox(self, agent.get_name(), - schema_id) + mbox = _SchemaPrefetchMailbox(self, schema_id) query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) logging.debug("Sending Schema Query to Agent (%s)" % time.time()) try: @@ -1628,8 +1659,8 @@ class Console(Thread): # note: do_query will add the new schema to the cache automatically. slist = self.do_query(_agent, - QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), - _timeout) + QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), + _timeout=_timeout) if slist: return slist[0] else: |