diff options
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r-- | python/qpid/driver.py | 49 |
1 files changed, 35 insertions, 14 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 8a1776d72c..03ba85aa39 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -58,7 +58,8 @@ DURABLE_DEFAULT=True # XXX FILTER_DEFAULTS = { - "topic": Pattern("*") + "topic": Pattern("*"), + "amq.failover": Pattern("DUMMY") } # XXX @@ -148,6 +149,11 @@ class Driver: self._lock = self.connection._lock self._selector = Selector.default() + self._attempts = 0 + self._hosts = [(self.connection.host, self.connection.port)] + \ + self.connection.backups + self._host = 0 + self._retrying = False self.reset() def reset(self): @@ -286,7 +292,6 @@ class Driver: @synchronized def timeout(self): - log.warn("retrying ...") self.dispatch() self.connection._waiter.notifyAll() @@ -294,10 +299,19 @@ class Driver: if self._socket is not None: self._socket.close() self.reset() - if recoverable and self.connection.reconnect: - self._timeout = time.time() + 3 - log.warn("recoverable error: %s" % err) - log.warn("sleeping 3 seconds") + if (recoverable and self.connection.reconnect and + (self.connection.reconnect_limit is None or + self.connection.reconnect_limit <= 0 or + self._attempts <= self.connection.reconnect_limit)): + if self._host > 0: + delay = 0 + else: + delay = self.connection.reconnect_delay + self._timeout = time.time() + delay + log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err)) + if delay > 0: + log.warn("sleeping %s seconds" % delay) + self._retrying = True else: self.connection.error = (err,) @@ -430,16 +444,23 @@ class Driver: def connect(self): try: # XXX: should make this non blocking - self._socket = connect(self.connection.host, self.connection.port) + if self._host == 0: + self._attempts += 1 + host, port = self._hosts[self._host] + if self._retrying: + log.warn("trying: %s:%s", host, port) + self._socket = connect(host, port) + if self._retrying: + log.warn("reconnect succeeded: %s:%s", host, port) self._timeout = None + self._attempts = 0 + self._host = 0 + self._retrying = False + self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) + self._opening = True except socket.error, e: - if self.connection.reconnect: - self._error(e, True) - return - else: - raise e - self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) - self._opening = True + self._host = (self._host + 1) % len(self._hosts) + self._error(e, True) def disconnect(self): self.write_op(ConnectionClose(close_code.normal)) |