summaryrefslogtreecommitdiff
path: root/python/qmf/console.py
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-03-27 20:46:24 +0000
committerTed Ross <tross@apache.org>2009-03-27 20:46:24 +0000
commitb264a276f994526ce24062c3f852fdd658857d29 (patch)
tree6aedda9cc832c27d39856a6bb3f9dcc0b64ee146 /python/qmf/console.py
parent2c8be931523ca30352ed01164ff70ac0f60fc02a (diff)
downloadqpid-python-b264a276f994526ce24062c3f852fdd658857d29.tar.gz
QPID-1702 QPID-1706
Updated qmf console in Python and Ruby - Added support for asynchronous method invocation - Added option to override timeout for method request and get request - Added exception handler in delegates.rb to catch Sasl errors - Added tests for the async method features git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@759341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf/console.py')
-rw-r--r--python/qmf/console.py60
1 files changed, 47 insertions, 13 deletions
diff --git a/python/qmf/console.py b/python/qmf/console.py
index 3b99595f1f..ef2ab264eb 100644
--- a/python/qmf/console.py
+++ b/python/qmf/console.py
@@ -77,15 +77,15 @@ class Console:
pass
def heartbeat(self, agent, timestamp):
- """ """
+ """ Invoked when an agent heartbeat is received. """
pass
def brokerInfo(self, broker):
- """ """
+ """ Invoked when the connection sequence reaches the point where broker information is available. """
pass
def methodResponse(self, broker, seq, response):
- """ """
+ """ Invoked when a method response from an asynchronous method call is received. """
pass
class BrokerURL(URL):
@@ -117,7 +117,7 @@ class Session:
_CONTEXT_STARTUP = 2
_CONTEXT_MULTIGET = 3
- GET_WAIT_TIME = 60
+ DEFAULT_GET_WAIT_TIME = 60
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
manageConnections=False, userBindings=False):
@@ -284,6 +284,11 @@ class Session:
_broker = <broker> - supply a broker as returned by addBroker.
+ The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ use the following argument:
+
+ _timeout = <time in seconds>
+
If additional arguments are supplied, they are used as property selectors. For example,
if the argument name="test" is supplied, only objects whose "name" property is "test"
will be returned in the result.
@@ -365,11 +370,15 @@ class Session:
starttime = time()
timeout = False
+ if "_timeout" in kwargs:
+ waitTime = kwargs["_timeout"]
+ else:
+ waitTime = self.DEFAULT_GET_WAIT_TIME
try:
self.cv.acquire()
while len(self.syncSequenceList) > 0 and self.error == None:
- self.cv.wait(self.GET_WAIT_TIME)
- if time() - starttime > self.GET_WAIT_TIME:
+ self.cv.wait(waitTime)
+ if time() - starttime > waitTime:
for pendingSeq in self.syncSequenceList:
self.seqMgr._release(pendingSeq)
self.syncSequenceList = []
@@ -498,7 +507,10 @@ class Session:
code = codec.read_uint32()
text = codec.read_str16()
outArgs = {}
- method, synchronous = self.seqMgr._release(seq)
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
if code == 0:
for arg in method.arguments:
if arg.dir.find("O") != -1:
@@ -1083,7 +1095,7 @@ class Object(object):
return value
raise Exception("Type Object has no attribute '%s'" % name)
- def _sendMethodRequest(self, name, args, kwargs, synchronous=False):
+ def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None):
for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
@@ -1105,8 +1117,13 @@ class Object(object):
if arg.dir.find("I") != -1:
self._session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
+ if timeWait:
+ ttl = timeWait * 1000
+ else:
+ ttl = None
smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
- (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
+ (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
+ ttl=ttl)
if synchronous:
try:
self._broker.cv.acquire()
@@ -1118,13 +1135,28 @@ class Object(object):
return None
def _invoke(self, name, args, kwargs):
- if self._sendMethodRequest(name, args, kwargs, True):
+ if "_timeout" in kwargs:
+ timeout = kwargs["_timeout"]
+ else:
+ timeout = self._broker.SYNC_TIME
+
+ if "_async" in kwargs and kwargs["_async"]:
+ sync = False
+ if "_timeout" not in kwargs:
+ timeout = None
+ else:
+ sync = True
+
+ seq = self._sendMethodRequest(name, args, kwargs, sync, timeout)
+ if seq:
+ if not sync:
+ return seq
try:
self._broker.cv.acquire()
starttime = time()
while self._broker.syncInFlight and self._broker.error == None:
- self._broker.cv.wait(self._broker.SYNC_TIME)
- if time() - starttime > self._broker.SYNC_TIME:
+ self._broker.cv.wait(timeout)
+ if time() - starttime > timeout:
self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
finally:
@@ -1407,9 +1439,11 @@ class Broker:
except:
return None, None
- def _message (self, body, routing_key="broker"):
+ def _message (self, body, routing_key="broker", ttl=None):
dp = self.amqpSession.delivery_properties()
dp.routing_key = routing_key
+ if ttl:
+ dp.ttl = ttl
mp = self.amqpSession.message_properties()
mp.content_type = "x-application/qmf"
mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName)