diff options
author | Ted Ross <tross@apache.org> | 2009-03-27 20:46:24 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-03-27 20:46:24 +0000 |
commit | b264a276f994526ce24062c3f852fdd658857d29 (patch) | |
tree | 6aedda9cc832c27d39856a6bb3f9dcc0b64ee146 /python/qmf/console.py | |
parent | 2c8be931523ca30352ed01164ff70ac0f60fc02a (diff) | |
download | qpid-python-b264a276f994526ce24062c3f852fdd658857d29.tar.gz |
QPID-1702 QPID-1706
Updated qmf console in Python and Ruby
- Added support for asynchronous method invocation
- Added option to override timeout for method request and get request
- Added exception handler in delegates.rb to catch Sasl errors
- Added tests for the async method features
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@759341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf/console.py')
-rw-r--r-- | python/qmf/console.py | 60 |
1 files changed, 47 insertions, 13 deletions
diff --git a/python/qmf/console.py b/python/qmf/console.py index 3b99595f1f..ef2ab264eb 100644 --- a/python/qmf/console.py +++ b/python/qmf/console.py @@ -77,15 +77,15 @@ class Console: pass def heartbeat(self, agent, timestamp): - """ """ + """ Invoked when an agent heartbeat is received. """ pass def brokerInfo(self, broker): - """ """ + """ Invoked when the connection sequence reaches the point where broker information is available. """ pass def methodResponse(self, broker, seq, response): - """ """ + """ Invoked when a method response from an asynchronous method call is received. """ pass class BrokerURL(URL): @@ -117,7 +117,7 @@ class Session: _CONTEXT_STARTUP = 2 _CONTEXT_MULTIGET = 3 - GET_WAIT_TIME = 60 + DEFAULT_GET_WAIT_TIME = 60 def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): @@ -284,6 +284,11 @@ class Session: _broker = <broker> - supply a broker as returned by addBroker. + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout = <time in seconds> + If additional arguments are supplied, they are used as property selectors. For example, if the argument name="test" is supplied, only objects whose "name" property is "test" will be returned in the result. @@ -365,11 +370,15 @@ class Session: starttime = time() timeout = False + if "_timeout" in kwargs: + waitTime = kwargs["_timeout"] + else: + waitTime = self.DEFAULT_GET_WAIT_TIME try: self.cv.acquire() while len(self.syncSequenceList) > 0 and self.error == None: - self.cv.wait(self.GET_WAIT_TIME) - if time() - starttime > self.GET_WAIT_TIME: + self.cv.wait(waitTime) + if time() - starttime > waitTime: for pendingSeq in self.syncSequenceList: self.seqMgr._release(pendingSeq) self.syncSequenceList = [] @@ -498,7 +507,10 @@ class Session: code = codec.read_uint32() text = codec.read_str16() outArgs = {} - method, synchronous = self.seqMgr._release(seq) + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: @@ -1083,7 +1095,7 @@ class Object(object): return value raise Exception("Type Object has no attribute '%s'" % name) - def _sendMethodRequest(self, name, args, kwargs, synchronous=False): + def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): for method in self._schema.getMethods(): if name == method.name: aIdx = 0 @@ -1105,8 +1117,13 @@ class Object(object): if arg.dir.find("I") != -1: self._session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 + if timeWait: + ttl = timeWait * 1000 + else: + ttl = None smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) + (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), + ttl=ttl) if synchronous: try: self._broker.cv.acquire() @@ -1118,13 +1135,28 @@ class Object(object): return None def _invoke(self, name, args, kwargs): - if self._sendMethodRequest(name, args, kwargs, True): + if "_timeout" in kwargs: + timeout = kwargs["_timeout"] + else: + timeout = self._broker.SYNC_TIME + + if "_async" in kwargs and kwargs["_async"]: + sync = False + if "_timeout" not in kwargs: + timeout = None + else: + sync = True + + seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) + if seq: + if not sync: + return seq try: self._broker.cv.acquire() starttime = time() while self._broker.syncInFlight and self._broker.error == None: - self._broker.cv.wait(self._broker.SYNC_TIME) - if time() - starttime > self._broker.SYNC_TIME: + self._broker.cv.wait(timeout) + if time() - starttime > timeout: self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") finally: @@ -1407,9 +1439,11 @@ class Broker: except: return None, None - def _message (self, body, routing_key="broker"): + def _message (self, body, routing_key="broker", ttl=None): dp = self.amqpSession.delivery_properties() dp.routing_key = routing_key + if ttl: + dp.ttl = ttl mp = self.amqpSession.message_properties() mp.content_type = "x-application/qmf" mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) |