summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-27 04:02:41 +0000
committerTed Ross <tross@apache.org>2010-03-27 04:02:41 +0000
commitb4ec63cfb66c4c709d9755c5ab5f051cdb2718ed (patch)
tree0cc2563bedd2f4f97c4d86bd62974c783ae37205
parent5cad5c30c4ecbd6e018acfa3aec0d49aab1abc31 (diff)
downloadqpid-python-b4ec63cfb66c4c709d9755c5ab5f051cdb2718ed.tar.gz
Python console: cancel pending queries immediately upon loss of agent connectivity.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@928138 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py61
1 files changed, 60 insertions, 1 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 2420b715a0..526eb49d24 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -465,6 +465,7 @@ class Session:
list: 21
}
+
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
manageConnections=False, userBindings=False):
"""
@@ -513,6 +514,7 @@ class Session:
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+
def _getBrokerForAgentAddr(self, agent_addr):
try:
self.cv.acquire()
@@ -524,6 +526,7 @@ class Session:
self.cv.release()
return None
+
def _getAgentForAgentAddr(self, agent_addr):
try:
self.cv.acquire()
@@ -535,9 +538,11 @@ class Session:
self.cv.release()
return None
+
def __repr__(self):
return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
+
def addBroker(self, target="localhost", timeout=None, mechanisms=None):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
@@ -549,6 +554,7 @@ class Session:
self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0))
return broker
+
def delBroker(self, broker):
""" Disconnect from a broker. The 'broker' argument is the object
returned from the addBroker call """
@@ -559,24 +565,28 @@ class Session:
self.brokers.remove(broker)
del broker
+
def getPackages(self):
""" Get the list of known QMF packages """
for broker in self.brokers:
broker._waitForStable()
return self.schemaCache.getPackages()
+
def getClasses(self, packageName):
""" Get the list of known classes within a QMF package """
for broker in self.brokers:
broker._waitForStable()
return self.schemaCache.getClasses(packageName)
+
def getSchema(self, classKey):
""" Get the schema for a QMF class """
for broker in self.brokers:
broker._waitForStable()
return self.schemaCache.getSchema(classKey)
+
def bindPackage(self, packageName):
""" Request object updates for all table classes within a package. """
if not self.userBindings or not self.rcvObjects:
@@ -588,6 +598,7 @@ class Session:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key=key)
+
def bindClass(self, pname, cname):
""" Request object updates for a particular table class by package and class name. """
if not self.userBindings or not self.rcvObjects:
@@ -598,6 +609,7 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key=key)
+
def bindClassKey(self, classKey):
""" Request object updates for a particular table class by class key. """
@@ -605,6 +617,7 @@ class Session:
cname = classKey.getClassName()
self.bindClass(pname, cname)
+
def getAgents(self, broker=None):
""" Get a list of currently known agents """
brokerList = []
@@ -622,6 +635,7 @@ class Session:
agentList.append(a)
return agentList
+
def makeObject(self, classKey, **kwargs):
""" Create a new, unmanaged object of the schema indicated by classKey """
schema = self.getSchema(classKey)
@@ -629,6 +643,7 @@ class Session:
raise Exception("Schema not found for classKey")
return Object(None, schema, None, True, True, kwargs)
+
def getObjects(self, **kwargs):
""" Get a list of objects from QMF agents.
All arguments are passed by name(keyword).
@@ -768,10 +783,12 @@ class Session:
raise RuntimeError("No agent responded within timeout period")
return self.getResult
+
def setEventFilter(self, **kwargs):
""" """
pass
+
def _bindingKeys(self):
keyList = []
keyList.append("schema.#")
@@ -788,18 +805,21 @@ class Session:
keyList.append("console.heartbeat.#")
return keyList
+
def _handleBrokerConnect(self, broker):
if self.console:
for agent in broker.getAgents():
self.console.newAgent(agent)
self.console.brokerConnected(broker)
+
def _handleBrokerDisconnect(self, broker):
if self.console:
for agent in broker.getAgents():
self.console.delAgent(agent)
self.console.brokerDisconnected(broker)
+
def _handleBrokerResp(self, broker, codec, seq):
broker.brokerId = codec.read_uuid()
if self.console != None:
@@ -813,6 +833,7 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
+
def _handlePackageInd(self, broker, codec, seq):
pname = str(codec.read_str8())
notify = self.schemaCache.declarePackage(pname)
@@ -828,6 +849,7 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
+
def _handleCommandComplete(self, broker, codec, seq, agent):
code = codec.read_uint32()
text = codec.read_str8()
@@ -869,6 +891,7 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
+
def _handleHeartbeatInd(self, broker, codec, seq, msg):
brokerBank = 1
agentBank = 0
@@ -892,6 +915,7 @@ class Session:
self.console.heartbeat(agent, timestamp)
broker._ageAgents()
+
def _handleSchemaResp(self, broker, codec, seq, agent_addr):
kind = codec.read_uint8()
classKey = ClassKey(codec)
@@ -908,6 +932,7 @@ class Session:
if agent:
agent._schemaInfoFromV2Agent()
+
def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
try:
agentName = ah["qmf.agent"]
@@ -927,9 +952,11 @@ class Session:
self.console.heartbeat(agent, timestamp)
broker._ageAgents()
+
def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
self._v2HandleHeartbeatInd(broker, mp, ah, content)
+
def _handleError(self, error):
try:
self.cv.acquire()
@@ -940,6 +967,7 @@ class Session:
finally:
self.cv.release()
+
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
for key, value in self.getSelect:
@@ -948,6 +976,7 @@ class Session:
return False
return True
+
def _decodeValue(self, codec, typecode, broker=None):
""" Decode, from the codec, a value based on its typecode. """
if typecode == 1: data = codec.read_uint8() # U8
@@ -1002,6 +1031,7 @@ class Session:
raise ValueError("Invalid type code: %d" % typecode)
return data
+
def _encodeValue(self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
if typecode == 1: codec.write_uint8 (int(value)) # U8
@@ -1043,9 +1073,11 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def encoding(self, value):
return self._encoding(value.__class__)
+
def _encoding(self, klass):
if Session.ENCODINGS.has_key(klass):
return self.ENCODINGS[klass]
@@ -1054,6 +1086,7 @@ class Session:
if result != None:
return result
+
def _displayValue(self, value, typecode):
""" """
if typecode == 1: return unicode(value)
@@ -1082,6 +1115,7 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def _defaultValue(self, stype, broker=None, kwargs={}):
""" """
typecode = stype.type
@@ -1120,6 +1154,7 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def _bestClassKey(self, pname, cname, preferredList):
""" """
if pname == None or cname == None:
@@ -1135,6 +1170,7 @@ class Session:
return c
return None
+
def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
""" This function can be used to send a method request to an object given only the
broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are
@@ -2227,6 +2263,17 @@ class Agent:
def close(self):
self.closed = True
+ copy = {}
+ try:
+ self.lock.acquire()
+ for seq in self.contextMap:
+ copy[seq] = self.contextMap[seq]
+ finally:
+ self.lock.release()
+
+ for seq in copy:
+ context = copy[seq]
+ context.cancel("Agent disconnected")
def __repr__(self):
@@ -2588,7 +2635,7 @@ class RequestContext(object):
def setSequence(self, sequence):
- self.sequence = sequence
+ self.sequence = sequence
def addV1QueryResult(self, data):
@@ -2610,6 +2657,18 @@ class RequestContext(object):
return time() - self.startTime
+ def cancel(self, exception):
+ self.setException(exception)
+ try:
+ self.cv.acquire()
+ self.blocked = None
+ self.waitingForSchema = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ self._complete()
+
+
def waitForSignal(self, timeout):
try:
self.cv.acquire()