diff options
author | Ted Ross <tross@apache.org> | 2008-11-24 19:49:57 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-11-24 19:49:57 +0000 |
commit | 5fc3778dbd5501329f8b45dd0d72cf116cc0ab10 (patch) | |
tree | 174f7ee2b45cb161421759ce1d110141c430b818 /python/qmf/console.py | |
parent | c3cd4c6c5a5317ae746440a2ef14ad93a6c12684 (diff) | |
download | qpid-python-5fc3778dbd5501329f8b45dd0d72cf116cc0ab10.tar.gz |
QPID-1483 - Connection handling in the Python QMF console API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@720275 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf/console.py')
-rw-r--r-- | python/qmf/console.py | 164 |
1 files changed, 126 insertions, 38 deletions
diff --git a/python/qmf/console.py b/python/qmf/console.py index 17d179b4d6..58cc46ae6f 100644 --- a/python/qmf/console.py +++ b/python/qmf/console.py @@ -29,7 +29,7 @@ from qpid.connection import Connection, ConnectionFailed from qpid.datatypes import UUID, uuid4, Message, RangedSet from qpid.util import connect, ssl, URL from qpid.codec010 import StringCodec as Codec -from threading import Lock, Condition +from threading import Lock, Condition, Thread from time import time, strftime, gmtime from cStringIO import StringIO @@ -165,22 +165,18 @@ class Session: if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") - if manageConnections: - raise Exception("manageConnections - not yet implemented") - def __repr__(self): - return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers) + return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) def addBroker(self, target="localhost"): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass, ssl = url.scheme == URL.AMQPS) - if not broker.isConnected and not self.manageConnections: - raise Exception(broker.error) self.brokers.append(broker) - self.getObjects(broker=broker, _class="agent") + if not self.manageConnections: + self.getObjects(broker=broker, _class="agent") return broker def delBroker(self, broker): @@ -220,22 +216,32 @@ class Session: return self.packages[pname][pkey] def bindPackage(self, packageName): - """ """ + """ Request object updates for all table classes within a package. """ if not self.userBindings or not self.rcvObjects: raise Exception("userBindings option not set for Session") + key = "console.obj.*.*.%s.#" % packageName + self.bindingKeyList.append(key) for broker in self.brokers: - broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, - binding_key="console.obj.*.*.%s.#" % packageName) + if broker.isConnected(): + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, + binding_key=key) - def bindClass(self, classKey): - """ """ + def bindClass(self, pname, cname): + """ Request object updates for a particular table class by package and class name. """ if not self.userBindings or not self.rcvObjects: raise Exception("userBindings option not set for Session") + key = "console.obj.*.*.%s.%s.#" % (pname, cname) + self.bindingKeyList.append(key) + for broker in self.brokers: + if broker.isConnected(): + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, + binding_key=key) + + def bindClassKey(self, classKey): + """ Request object updates for a particular table class by class key. """ pname = classKey.getPackageName() cname = classKey.getClassName() - for broker in self.brokers: - broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, - binding_key="console.obj.*.*.%s.%s.#" % (pname, cname)) + self.bindClass(pname, cname) def getAgents(self, broker=None): """ Get a list of currently known agents """ @@ -293,11 +299,16 @@ class Session: agent = kwargs["_agent"] if agent.broker not in brokerList: raise Exception("Supplied agent is not accessible through the supplied broker") - agentList.append(agent) + if agent.broker.isConnected(): + agentList.append(agent) else: for broker in brokerList: for agent in broker.getAgents(): - agentList.append(agent) + if agent.broker.isConnected(): + agentList.append(agent) + + if len(agentList) == 0: + return [] pname = None cname = None @@ -387,10 +398,12 @@ class Session: return keyList def _handleBrokerConnect(self, broker): - pass + if self.console: + self.console.brokerConnected(broker) def _handleBrokerDisconnect(self, broker): - pass + if self.console: + self.console.brokerDisconnected(broker) def _handleBrokerResp(self, broker, codec, seq): broker.brokerId = UUID(codec.read_uuid()) @@ -1127,8 +1140,63 @@ class MethodResult(object): def __repr__(self): return "%s (%d) - %s" % (self.text, self.status, self.outArgs) +class ManagedConnection(Thread): + """ Thread class for managing a connection. """ + DELAY_MIN = 1 + DELAY_MAX = 128 + DELAY_FACTOR = 2 + + def __init__(self, broker): + Thread.__init__(self) + self.broker = broker + self.cv = Condition() + self.canceled = False + + def stop(self): + """ Tell this thread to stop running and return. """ + try: + self.cv.acquire() + self.canceled = True + self.cv.notify() + finally: + self.cv.release() + + def disconnected(self): + """ Notify the thread that the connection was lost. """ + try: + self.cv.acquire() + self.cv.notify() + finally: + self.cv.release() + + def run(self): + """ Main body of the running thread. """ + delay = self.DELAY_MIN + while True: + try: + self.broker._tryToConnect() + try: + self.cv.acquire() + while (not self.canceled) and self.broker.connected: + self.cv.wait() + if self.canceled: + return + delay = self.DELAY_MIN + except: + self.cv.release() + except: + if delay < self.DELAY_MAX: + delay *= self.DELAY_FACTOR + try: + self.cv.acquire() + self.cv.wait(delay) + if self.canceled: + return + finally: + self.cv.release() + class Broker: - """ """ + """ This object represents a connection (or potential connection) to a QMF broker. """ SYNC_TIME = 60 def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False): @@ -1138,24 +1206,24 @@ class Broker: self.ssl = ssl self.authUser = authUser self.authPass = authPass - self.agents = {} - self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") - self.topicBound = False self.cv = Condition() - self.syncInFlight = False - self.syncRequest = 0 - self.syncResult = None - self.reqsOutstanding = 1 self.error = None self.brokerId = None - self.isConnected = False + self.connected = False self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self._tryToConnect() + if self.session.manageConnections: + self.thread = ManagedConnection(self) + self.thread.start() + else: + self.thread = None + self._tryToConnect() def isConnected(self): - return self.isConnected + """ Return True if there is an active connection to the broker. """ + return self.connected def getError(self): + """ Return the last error message seen while trying to connect to the broker. """ return self.error def getBrokerId(self): @@ -1163,9 +1231,13 @@ class Broker: return self.brokerId def getBrokerBank(self): + """ Return the broker-bank value. This is the value that the broker assigns to + objects within its control. This value appears as a field in the ObjectId + of objects created by agents controlled by this broker. """ return 1 def getAgent(self, brokerBank, agentBank): + """ Return the agent object associated with a particular broker and agent bank value.""" bankKey = (brokerBank, agentBank) if bankKey in self.agents: return self.agents[bankKey] @@ -1199,13 +1271,21 @@ class Broker: return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672) def __repr__(self): - if self.isConnected: + if self.connected: return "Broker connected at: %s" % self.getUrl() else: return "Disconnected Broker" def _tryToConnect(self): try: + self.agents = {} + self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") + self.topicBound = False + self.syncInFlight = False + self.syncRequest = 0 + self.syncResult = None + self.reqsOutstanding = 1 + sock = connect(self.host, self.port) if self.ssl: sock = ssl(sock) @@ -1235,7 +1315,7 @@ class Broker: self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF) - self.isConnected = True + self.connected = True self.session._handleBrokerConnect(self) codec = Codec(self.conn.spec) @@ -1245,10 +1325,13 @@ class Broker: except socket.error, e: self.error = "Socket Error %s - %s" % (e[0], e[1]) + raise Exception(self.error) except Closed, e: self.error = "Connect Failed %d - %s" % (e[0], e[1]) + raise Exception(self.error) except ConnectionFailed, e: self.error = "Connect Failed %d - %s" % (e[0], e[1]) + raise Exception(self.error) def _updateAgent(self, obj): bankKey = (obj.brokerBank, obj.agentBank) @@ -1301,19 +1384,22 @@ class Broker: self.amqpSession.message_transfer(destination=dest, message=msg) def _shutdown(self): - if self.isConnected: + if self.thread: + self.thread.stop() + self.thread.join() + if self.connected: self.amqpSession.incoming("rdest").stop() if self.session.console != None: self.amqpSession.incoming("tdest").stop() self.amqpSession.close() self.conn.close() - self.isConnected = False - else: - raise Exception("Broker already disconnected") + self.connected = False def _waitForStable(self): try: self.cv.acquire() + if not self.connected: + return if self.reqsOutstanding == 0: return self.syncInFlight = True @@ -1365,7 +1451,7 @@ class Broker: elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) def _exceptionCb(self, data): - self.isConnected = False + self.connected = False self.error = data try: self.cv.acquire() @@ -1375,6 +1461,8 @@ class Broker: self.cv.release() self.session._handleError(self.error) self.session._handleBrokerDisconnect(self) + if self.thread: + self.thread.disconnected() class Agent: """ """ |