diff options
author | Alan Conway <aconway@apache.org> | 2014-01-09 22:30:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-01-09 22:30:26 +0000 |
commit | 1b3fa07abd75633c88000908091b8af94f5e3139 (patch) | |
tree | 4c2c4c818eeee269ed01fe8e1a3152d63d38e0a3 /python | |
parent | c98dc793f7c93a54fdd3e482ce7e23d774c93110 (diff) | |
download | qpid-python-1b3fa07abd75633c88000908091b8af94f5e3139.tar.gz |
QPID-5428: Heartbeats not in use when attempting to connect with python client.
Heartbeats ignored when opening a connection, could hang indefinitely
Need to cover 3 cases (test included):
- Connect sucessful but then broker stalls.
- Connect to a stalled broker that never responds.
- Fail-over to a stalled broker that never responds
All cases are handled by the following fixes to driver.py:
- Check for heartbeats even before engine._connected since we may time out
before receiving open-ok if the peer is stalled and never sends data.
- Set _last_in and _last_out so that we time heartbeats from the start of the
connection if no data is ever sent or received.
- Call self.update_status in Driver.timeout to detect connection closed due to
heartbeat timeout (rather than a readable or writeable event.)
Make update_status a no-op if engine or transport are not yet set up.
- Don't consider reconnect complete in connect(), wait till we get the open-ok.
See the comment on Driver._check_retry_ok()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1556971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/messaging/driver.py | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 06bbe610b8..43c42273ea 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -362,11 +362,11 @@ class Driver: [(u.host, default(u.port, 5672)) for u in urls] if self._host >= len(hosts): self._host = 0 - result = hosts[self._host] + self._last_host = hosts[self._host] if self._host == 0: self._attempts += 1 self._host = self._host + 1 - return result + return self._last_host def _num_hosts(self): return len(self.connection.reconnect_urls) + 1 @@ -401,6 +401,24 @@ class Driver: def timing(self): return self._timeout + def _check_retry_ok(self): + """We consider a reconnect to have suceeded only when we have received + open-ok from the peer. + + If we declared success as soon as the transport connected, then we could get + into an infinite heartbeat loop if the remote process is hung and never + sends us any data. We would fail the connection after 2 missed heartbeats, + reconnect the transport, declare the reconnect ok, then fail again after 2 + missed heartbeats and so on. + """ + if self._retrying and self.engine._connected: # Means we have received open-ok. + if self._reconnect_log: + log.warn("reconnect succeeded: %s:%s", *self._last_host) + self._next_retry = None + self._attempts = 0 + self._delay = self.connection.reconnect_interval_min + self._retrying = False + @synchronized def readable(self): try: @@ -410,6 +428,7 @@ class Driver: elif data: rawlog.debug("READ[%s]: %r", self.log_id, data) self.engine.write(data) + self._check_retry_ok() else: self.close_engine() except socket.error, e: @@ -451,13 +470,14 @@ class Driver: self.schedule() def update_status(self): + if not self.engine: return False status = self.engine.status() return getattr(self, "st_%s" % status.lower())() def st_closed(self): # XXX: this log statement seems to sometimes hit when the socket is not connected # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) - self._transport.close() + if self._transport: self._transport.close() self._transport = None self.engine = None return True @@ -483,6 +503,7 @@ class Driver: @synchronized def timeout(self): self.dispatch() + self.update_status() self._notify() self.schedule() @@ -531,12 +552,6 @@ class Driver: self._transport = trans(self.connection, host, port) else: raise ConnectError("no such transport: %s" % self.connection.transport) - if self._retrying and self._reconnect_log: - log.warn("reconnect succeeded: %s:%s", host, port) - self._next_retry = None - self._attempts = 0 - self._delay = self.connection.reconnect_interval_min - self._retrying = False self.schedule() except socket.error, e: self.close_engine(ConnectError(text=str(e))) @@ -589,8 +604,10 @@ class Engine: self._status = CLOSED self._buf = "" self._hdr = "" - self._last_in = None - self._last_out = None + # Set _last_in and _last_out here so heartbeats will be timed from the + # beginning of connection if no data is sent/received. + self._last_in = time.time() + self._last_out = time.time() self._op_enc = OpEncoder() self._seg_enc = SegmentEncoder() self._frame_enc = FrameEncoder() @@ -813,13 +830,15 @@ class Engine: self.attach(ssn) self.process(ssn) - if self.connection.heartbeat and self._status != CLOSED: - now = time.time() - if self._last_in is not None and \ - now - self._last_in > 2*self.connection.heartbeat: - raise HeartbeatTimeout(text="heartbeat timeout") - if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: - self.write_op(ConnectionHeartbeat()) + # We need to check heartbeat even if not self._connected since we may have + # heartbeat timeout before receiving an open-ok + if self.connection.heartbeat and self._status != CLOSED and not self._closing: + now = time.time() + if now - self._last_in > 2*self.connection.heartbeat: + raise HeartbeatTimeout(text="heartbeat timeout") + # Only send heartbeats if we are connected. + if self._connected and now - self._last_out >= self.connection.heartbeat/2.0: + self.write_op(ConnectionHeartbeat()) def open(self): self._reset() |