summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-24 00:38:30 +0000
committerTed Ross <tross@apache.org>2008-10-24 00:38:30 +0000
commitbd0c16218e3ccc75ce997ba9f0806c27e6468085 (patch)
tree4af981eec709f467b01ab1a1b14655bf687c4c74
parent93b74ecdb121a7ce0566c51f60ba003fbb00e74b (diff)
downloadqpid-python-bd0c16218e3ccc75ce997ba9f0806c27e6468085.tar.gz
Added alternative functions for invoking methods
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707514 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--python/qpid/qmfconsole.py100
1 files changed, 74 insertions, 26 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index bfa48c7540..febcae5ea2 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -82,6 +82,10 @@ class Console:
""" """
pass
+ def methodResponse(self, broker, seq, response):
+ """ """
+ pass
+
class BrokerURL:
def __init__(self, text):
rex = re.compile(r"""
@@ -458,16 +462,21 @@ class Session:
code = codec.read_uint32()
text = str(codec.read_str8())
outArgs = {}
- obj, method = self.seqMgr._release(seq)
+ method, synchronous = self.seqMgr._release(seq)
if code == 0:
for arg in method.arguments:
if arg.dir.find("O") != -1:
outArgs[arg.name] = self._decodeValue(codec, arg.type)
- broker.cv.acquire()
- broker.syncResult = MethodResult(code, text, outArgs)
- broker.syncInFlight = False
- broker.cv.notify()
- broker.cv.release()
+ result = MethodResult(code, text, outArgs)
+ if synchronous:
+ broker.cv.acquire()
+ broker.syncResult = result
+ broker.syncInFlight = False
+ broker.cv.notify()
+ broker.cv.release()
+ else:
+ if self.console:
+ self.console.methodResponse(broker, seq, result)
def _handleHeartbeatInd(self, broker, codec, seq):
timestamp = codec.read_uint64()
@@ -612,6 +621,41 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+ def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
+ """ This function can be used to send a method request to an object given only the
+ broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are
+ normally invoked on the object itself.
+ """
+ schema = self.getSchema(schemaKey)
+ for method in schema.getMethods():
+ if name == method.name:
+ aIdx = 0
+ sendCodec = Codec(broker.conn.spec)
+ seq = self.seqMgr._reserve((method, False))
+ broker._setHeader(sendCodec, 'M', seq)
+ objectId.encode(sendCodec)
+ pname, cname, hash = schemaKey
+ sendCodec.write_str8(pname)
+ sendCodec.write_str8(cname)
+ sendCodec.write_bin128(hash)
+ sendCodec.write_str8(name)
+
+ count = 0
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ count += 1
+ if count != len(argList):
+ raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList)))
+
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ self._encodeValue(sendCodec, argList[aIdx], arg.type)
+ aIdx += 1
+ smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
+ (objectId.getBroker(), objectId.getBank()))
+ broker._send(smsg)
+ return seq
+ return None
class Package:
""" """
@@ -921,12 +965,12 @@ class Object(object):
return value
raise Exception("Type Object has no attribute '%s'" % name)
- def _invoke(self, name, args, kwargs):
+ def _sendMethodRequest(self, name, args, kwargs, synchronous=False):
for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
sendCodec = Codec(self._broker.conn.spec)
- seq = self._session.seqMgr._reserve((self, method))
+ seq = self._session.seqMgr._reserve((method, synchronous))
self._broker._setHeader(sendCodec, 'M', seq)
self._objectId.encode(sendCodec)
pname, cname, hash = self._schema.getKey()
@@ -948,26 +992,30 @@ class Object(object):
aIdx += 1
smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
(self._objectId.getBroker(), self._objectId.getBank()))
- self._broker.cv.acquire()
- self._broker.syncInFlight = True
- self._broker.cv.release()
-
+ if synchronous:
+ self._broker.cv.acquire()
+ self._broker.syncInFlight = True
+ self._broker.cv.release()
self._broker._send(smsg)
+ return seq
+ return None
- self._broker.cv.acquire()
- starttime = time()
- while self._broker.syncInFlight and self._broker.error == None:
- self._broker.cv.wait(self._broker.SYNC_TIME)
- if time() - starttime > self._broker.SYNC_TIME:
- self._broker.cv.release()
- self._session.seqMgr._release(seq)
- raise RuntimeError("Timed out waiting for method to respond")
- self._broker.cv.release()
- if self._broker.error != None:
- errorText = self._broker.error
- self._broker.error = None
- raise Exception(errorText)
- return self._broker.syncResult
+ def _invoke(self, name, args, kwargs):
+ if self._sendMethodRequest(name, args, kwargs, True):
+ self._broker.cv.acquire()
+ starttime = time()
+ while self._broker.syncInFlight and self._broker.error == None:
+ self._broker.cv.wait(self._broker.SYNC_TIME)
+ if time() - starttime > self._broker.SYNC_TIME:
+ self._broker.cv.release()
+ self._session.seqMgr._release(seq)
+ raise RuntimeError("Timed out waiting for method to respond")
+ self._broker.cv.release()
+ if self._broker.error != None:
+ errorText = self._broker.error
+ self._broker.error = None
+ raise Exception(errorText)
+ return self._broker.syncResult
raise Exception("Invalid Method (software defect) [%s]" % name)
def _parsePresenceMasks(self, codec, schema):