diff options
author | Ted Ross <tross@apache.org> | 2008-10-24 00:38:30 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-10-24 00:38:30 +0000 |
commit | bd0c16218e3ccc75ce997ba9f0806c27e6468085 (patch) | |
tree | 4af981eec709f467b01ab1a1b14655bf687c4c74 | |
parent | 93b74ecdb121a7ce0566c51f60ba003fbb00e74b (diff) | |
download | qpid-python-bd0c16218e3ccc75ce997ba9f0806c27e6468085.tar.gz |
Added alternative functions for invoking methods
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707514 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qpid/qmfconsole.py | 100 |
1 files changed, 74 insertions, 26 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index bfa48c7540..febcae5ea2 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -82,6 +82,10 @@ class Console: """ """ pass + def methodResponse(self, broker, seq, response): + """ """ + pass + class BrokerURL: def __init__(self, text): rex = re.compile(r""" @@ -458,16 +462,21 @@ class Session: code = codec.read_uint32() text = str(codec.read_str8()) outArgs = {} - obj, method = self.seqMgr._release(seq) + method, synchronous = self.seqMgr._release(seq) if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: outArgs[arg.name] = self._decodeValue(codec, arg.type) - broker.cv.acquire() - broker.syncResult = MethodResult(code, text, outArgs) - broker.syncInFlight = False - broker.cv.notify() - broker.cv.release() + result = MethodResult(code, text, outArgs) + if synchronous: + broker.cv.acquire() + broker.syncResult = result + broker.syncInFlight = False + broker.cv.notify() + broker.cv.release() + else: + if self.console: + self.console.methodResponse(broker, seq, result) def _handleHeartbeatInd(self, broker, codec, seq): timestamp = codec.read_uint64() @@ -612,6 +621,41 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): + """ This function can be used to send a method request to an object given only the + broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are + normally invoked on the object itself. + """ + schema = self.getSchema(schemaKey) + for method in schema.getMethods(): + if name == method.name: + aIdx = 0 + sendCodec = Codec(broker.conn.spec) + seq = self.seqMgr._reserve((method, False)) + broker._setHeader(sendCodec, 'M', seq) + objectId.encode(sendCodec) + pname, cname, hash = schemaKey + sendCodec.write_str8(pname) + sendCodec.write_str8(cname) + sendCodec.write_bin128(hash) + sendCodec.write_str8(name) + + count = 0 + for arg in method.arguments: + if arg.dir.find("I") != -1: + count += 1 + if count != len(argList): + raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._encodeValue(sendCodec, argList[aIdx], arg.type) + aIdx += 1 + smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % + (objectId.getBroker(), objectId.getBank())) + broker._send(smsg) + return seq + return None class Package: """ """ @@ -921,12 +965,12 @@ class Object(object): return value raise Exception("Type Object has no attribute '%s'" % name) - def _invoke(self, name, args, kwargs): + def _sendMethodRequest(self, name, args, kwargs, synchronous=False): for method in self._schema.getMethods(): if name == method.name: aIdx = 0 sendCodec = Codec(self._broker.conn.spec) - seq = self._session.seqMgr._reserve((self, method)) + seq = self._session.seqMgr._reserve((method, synchronous)) self._broker._setHeader(sendCodec, 'M', seq) self._objectId.encode(sendCodec) pname, cname, hash = self._schema.getKey() @@ -948,26 +992,30 @@ class Object(object): aIdx += 1 smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % (self._objectId.getBroker(), self._objectId.getBank())) - self._broker.cv.acquire() - self._broker.syncInFlight = True - self._broker.cv.release() - + if synchronous: + self._broker.cv.acquire() + self._broker.syncInFlight = True + self._broker.cv.release() self._broker._send(smsg) + return seq + return None - self._broker.cv.acquire() - starttime = time() - while self._broker.syncInFlight and self._broker.error == None: - self._broker.cv.wait(self._broker.SYNC_TIME) - if time() - starttime > self._broker.SYNC_TIME: - self._broker.cv.release() - self._session.seqMgr._release(seq) - raise RuntimeError("Timed out waiting for method to respond") - self._broker.cv.release() - if self._broker.error != None: - errorText = self._broker.error - self._broker.error = None - raise Exception(errorText) - return self._broker.syncResult + def _invoke(self, name, args, kwargs): + if self._sendMethodRequest(name, args, kwargs, True): + self._broker.cv.acquire() + starttime = time() + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(self._broker.SYNC_TIME) + if time() - starttime > self._broker.SYNC_TIME: + self._broker.cv.release() + self._session.seqMgr._release(seq) + raise RuntimeError("Timed out waiting for method to respond") + self._broker.cv.release() + if self._broker.error != None: + errorText = self._broker.error + self._broker.error = None + raise Exception(errorText) + return self._broker.syncResult raise Exception("Invalid Method (software defect) [%s]" % name) def _parsePresenceMasks(self, codec, schema): |