summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py49
1 files changed, 35 insertions, 14 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 8a1776d72c..03ba85aa39 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -58,7 +58,8 @@ DURABLE_DEFAULT=True
# XXX
FILTER_DEFAULTS = {
- "topic": Pattern("*")
+ "topic": Pattern("*"),
+ "amq.failover": Pattern("DUMMY")
}
# XXX
@@ -148,6 +149,11 @@ class Driver:
self._lock = self.connection._lock
self._selector = Selector.default()
+ self._attempts = 0
+ self._hosts = [(self.connection.host, self.connection.port)] + \
+ self.connection.backups
+ self._host = 0
+ self._retrying = False
self.reset()
def reset(self):
@@ -286,7 +292,6 @@ class Driver:
@synchronized
def timeout(self):
- log.warn("retrying ...")
self.dispatch()
self.connection._waiter.notifyAll()
@@ -294,10 +299,19 @@ class Driver:
if self._socket is not None:
self._socket.close()
self.reset()
- if recoverable and self.connection.reconnect:
- self._timeout = time.time() + 3
- log.warn("recoverable error: %s" % err)
- log.warn("sleeping 3 seconds")
+ if (recoverable and self.connection.reconnect and
+ (self.connection.reconnect_limit is None or
+ self.connection.reconnect_limit <= 0 or
+ self._attempts <= self.connection.reconnect_limit)):
+ if self._host > 0:
+ delay = 0
+ else:
+ delay = self.connection.reconnect_delay
+ self._timeout = time.time() + delay
+ log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
+ if delay > 0:
+ log.warn("sleeping %s seconds" % delay)
+ self._retrying = True
else:
self.connection.error = (err,)
@@ -430,16 +444,23 @@ class Driver:
def connect(self):
try:
# XXX: should make this non blocking
- self._socket = connect(self.connection.host, self.connection.port)
+ if self._host == 0:
+ self._attempts += 1
+ host, port = self._hosts[self._host]
+ if self._retrying:
+ log.warn("trying: %s:%s", host, port)
+ self._socket = connect(host, port)
+ if self._retrying:
+ log.warn("reconnect succeeded: %s:%s", host, port)
self._timeout = None
+ self._attempts = 0
+ self._host = 0
+ self._retrying = False
+ self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+ self._opening = True
except socket.error, e:
- if self.connection.reconnect:
- self._error(e, True)
- return
- else:
- raise e
- self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
- self._opening = True
+ self._host = (self._host + 1) % len(self._hosts)
+ self._error(e, True)
def disconnect(self):
self.write_op(ConnectionClose(close_code.normal))