diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-01-29 21:41:46 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-01-29 21:41:46 +0000 |
commit | 7f2532ceb27fae57ebe8f2d80ba40665f0e6e1cf (patch) | |
tree | 4f50d2aa59d1d134c6014a1904a086a8fe1ed4ec /python/qpid/driver.py | |
parent | 576ca5afb5d8016bbaad44db260124be029ce145 (diff) | |
download | qpid-python-7f2532ceb27fae57ebe8f2d80ba40665f0e6e1cf.tar.gz |
added reconnect_delay, reconnect_limit, and backups option to Connection
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904634 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r-- | python/qpid/driver.py | 49 |
1 files changed, 35 insertions, 14 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 8a1776d72c..03ba85aa39 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -58,7 +58,8 @@ DURABLE_DEFAULT=True # XXX FILTER_DEFAULTS = { - "topic": Pattern("*") + "topic": Pattern("*"), + "amq.failover": Pattern("DUMMY") } # XXX @@ -148,6 +149,11 @@ class Driver: self._lock = self.connection._lock self._selector = Selector.default() + self._attempts = 0 + self._hosts = [(self.connection.host, self.connection.port)] + \ + self.connection.backups + self._host = 0 + self._retrying = False self.reset() def reset(self): @@ -286,7 +292,6 @@ class Driver: @synchronized def timeout(self): - log.warn("retrying ...") self.dispatch() self.connection._waiter.notifyAll() @@ -294,10 +299,19 @@ class Driver: if self._socket is not None: self._socket.close() self.reset() - if recoverable and self.connection.reconnect: - self._timeout = time.time() + 3 - log.warn("recoverable error: %s" % err) - log.warn("sleeping 3 seconds") + if (recoverable and self.connection.reconnect and + (self.connection.reconnect_limit is None or + self.connection.reconnect_limit <= 0 or + self._attempts <= self.connection.reconnect_limit)): + if self._host > 0: + delay = 0 + else: + delay = self.connection.reconnect_delay + self._timeout = time.time() + delay + log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err)) + if delay > 0: + log.warn("sleeping %s seconds" % delay) + self._retrying = True else: self.connection.error = (err,) @@ -430,16 +444,23 @@ class Driver: def connect(self): try: # XXX: should make this non blocking - self._socket = connect(self.connection.host, self.connection.port) + if self._host == 0: + self._attempts += 1 + host, port = self._hosts[self._host] + if self._retrying: + log.warn("trying: %s:%s", host, port) + self._socket = connect(host, port) + if self._retrying: + log.warn("reconnect succeeded: %s:%s", host, port) self._timeout = None + self._attempts = 0 + self._host = 0 + self._retrying = False + self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) + self._opening = True except socket.error, e: - if self.connection.reconnect: - self._error(e, True) - return - else: - raise e - self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) - self._opening = True + self._host = (self._host + 1) % len(self._hosts) + self._error(e, True) def disconnect(self): self.write_op(ConnectionClose(close_code.normal)) |