diff options
author | Ted Ross <tross@apache.org> | 2010-03-27 04:02:41 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-03-27 04:02:41 +0000 |
commit | b4ec63cfb66c4c709d9755c5ab5f051cdb2718ed (patch) | |
tree | 0cc2563bedd2f4f97c4d86bd62974c783ae37205 | |
parent | 5cad5c30c4ecbd6e018acfa3aec0d49aab1abc31 (diff) | |
download | qpid-python-b4ec63cfb66c4c709d9755c5ab5f051cdb2718ed.tar.gz |
Python console: cancel pending queries immediately upon loss of agent connectivity.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@928138 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 61 |
1 files changed, 60 insertions, 1 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 2420b715a0..526eb49d24 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -465,6 +465,7 @@ class Session: list: 21 } + def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): """ @@ -513,6 +514,7 @@ class Session: if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + def _getBrokerForAgentAddr(self, agent_addr): try: self.cv.acquire() @@ -524,6 +526,7 @@ class Session: self.cv.release() return None + def _getAgentForAgentAddr(self, agent_addr): try: self.cv.acquire() @@ -535,9 +538,11 @@ class Session: self.cv.release() return None + def __repr__(self): return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) + def addBroker(self, target="localhost", timeout=None, mechanisms=None): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) @@ -549,6 +554,7 @@ class Session: self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0)) return broker + def delBroker(self, broker): """ Disconnect from a broker. The 'broker' argument is the object returned from the addBroker call """ @@ -559,24 +565,28 @@ class Session: self.brokers.remove(broker) del broker + def getPackages(self): """ Get the list of known QMF packages """ for broker in self.brokers: broker._waitForStable() return self.schemaCache.getPackages() + def getClasses(self, packageName): """ Get the list of known classes within a QMF package """ for broker in self.brokers: broker._waitForStable() return self.schemaCache.getClasses(packageName) + def getSchema(self, classKey): """ Get the schema for a QMF class """ for broker in self.brokers: broker._waitForStable() return self.schemaCache.getSchema(classKey) + def bindPackage(self, packageName): """ Request object updates for all table classes within a package. """ if not self.userBindings or not self.rcvObjects: @@ -588,6 +598,7 @@ class Session: broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) + def bindClass(self, pname, cname): """ Request object updates for a particular table class by package and class name. """ if not self.userBindings or not self.rcvObjects: @@ -598,6 +609,7 @@ class Session: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) + def bindClassKey(self, classKey): """ Request object updates for a particular table class by class key. """ @@ -605,6 +617,7 @@ class Session: cname = classKey.getClassName() self.bindClass(pname, cname) + def getAgents(self, broker=None): """ Get a list of currently known agents """ brokerList = [] @@ -622,6 +635,7 @@ class Session: agentList.append(a) return agentList + def makeObject(self, classKey, **kwargs): """ Create a new, unmanaged object of the schema indicated by classKey """ schema = self.getSchema(classKey) @@ -629,6 +643,7 @@ class Session: raise Exception("Schema not found for classKey") return Object(None, schema, None, True, True, kwargs) + def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. All arguments are passed by name(keyword). @@ -768,10 +783,12 @@ class Session: raise RuntimeError("No agent responded within timeout period") return self.getResult + def setEventFilter(self, **kwargs): """ """ pass + def _bindingKeys(self): keyList = [] keyList.append("schema.#") @@ -788,18 +805,21 @@ class Session: keyList.append("console.heartbeat.#") return keyList + def _handleBrokerConnect(self, broker): if self.console: for agent in broker.getAgents(): self.console.newAgent(agent) self.console.brokerConnected(broker) + def _handleBrokerDisconnect(self, broker): if self.console: for agent in broker.getAgents(): self.console.delAgent(agent) self.console.brokerDisconnected(broker) + def _handleBrokerResp(self, broker, codec, seq): broker.brokerId = codec.read_uuid() if self.console != None: @@ -813,6 +833,7 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) + def _handlePackageInd(self, broker, codec, seq): pname = str(codec.read_str8()) notify = self.schemaCache.declarePackage(pname) @@ -828,6 +849,7 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) + def _handleCommandComplete(self, broker, codec, seq, agent): code = codec.read_uint32() text = codec.read_str8() @@ -869,6 +891,7 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) + def _handleHeartbeatInd(self, broker, codec, seq, msg): brokerBank = 1 agentBank = 0 @@ -892,6 +915,7 @@ class Session: self.console.heartbeat(agent, timestamp) broker._ageAgents() + def _handleSchemaResp(self, broker, codec, seq, agent_addr): kind = codec.read_uint8() classKey = ClassKey(codec) @@ -908,6 +932,7 @@ class Session: if agent: agent._schemaInfoFromV2Agent() + def _v2HandleHeartbeatInd(self, broker, mp, ah, content): try: agentName = ah["qmf.agent"] @@ -927,9 +952,11 @@ class Session: self.console.heartbeat(agent, timestamp) broker._ageAgents() + def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): self._v2HandleHeartbeatInd(broker, mp, ah, content) + def _handleError(self, error): try: self.cv.acquire() @@ -940,6 +967,7 @@ class Session: finally: self.cv.release() + def _selectMatch(self, object): """ Check the object against self.getSelect to check for a match """ for key, value in self.getSelect: @@ -948,6 +976,7 @@ class Session: return False return True + def _decodeValue(self, codec, typecode, broker=None): """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.read_uint8() # U8 @@ -1002,6 +1031,7 @@ class Session: raise ValueError("Invalid type code: %d" % typecode) return data + def _encodeValue(self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: codec.write_uint8 (int(value)) # U8 @@ -1043,9 +1073,11 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def encoding(self, value): return self._encoding(value.__class__) + def _encoding(self, klass): if Session.ENCODINGS.has_key(klass): return self.ENCODINGS[klass] @@ -1054,6 +1086,7 @@ class Session: if result != None: return result + def _displayValue(self, value, typecode): """ """ if typecode == 1: return unicode(value) @@ -1082,6 +1115,7 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def _defaultValue(self, stype, broker=None, kwargs={}): """ """ typecode = stype.type @@ -1120,6 +1154,7 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def _bestClassKey(self, pname, cname, preferredList): """ """ if pname == None or cname == None: @@ -1135,6 +1170,7 @@ class Session: return c return None + def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): """ This function can be used to send a method request to an object given only the broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are @@ -2227,6 +2263,17 @@ class Agent: def close(self): self.closed = True + copy = {} + try: + self.lock.acquire() + for seq in self.contextMap: + copy[seq] = self.contextMap[seq] + finally: + self.lock.release() + + for seq in copy: + context = copy[seq] + context.cancel("Agent disconnected") def __repr__(self): @@ -2588,7 +2635,7 @@ class RequestContext(object): def setSequence(self, sequence): - self.sequence = sequence + self.sequence = sequence def addV1QueryResult(self, data): @@ -2610,6 +2657,18 @@ class RequestContext(object): return time() - self.startTime + def cancel(self, exception): + self.setException(exception) + try: + self.cv.acquire() + self.blocked = None + self.waitingForSchema = None + self.cv.notify() + finally: + self.cv.release() + self._complete() + + def waitForSignal(self, timeout): try: self.cv.acquire() |