summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-01-09 22:30:26 +0000
committerAlan Conway <aconway@apache.org>2014-01-09 22:30:26 +0000
commit1b3fa07abd75633c88000908091b8af94f5e3139 (patch)
tree4c2c4c818eeee269ed01fe8e1a3152d63d38e0a3 /python
parentc98dc793f7c93a54fdd3e482ce7e23d774c93110 (diff)
downloadqpid-python-1b3fa07abd75633c88000908091b8af94f5e3139.tar.gz
QPID-5428: Heartbeats not in use when attempting to connect with python client.
Heartbeats ignored when opening a connection, could hang indefinitely Need to cover 3 cases (test included): - Connect sucessful but then broker stalls. - Connect to a stalled broker that never responds. - Fail-over to a stalled broker that never responds All cases are handled by the following fixes to driver.py: - Check for heartbeats even before engine._connected since we may time out before receiving open-ok if the peer is stalled and never sends data. - Set _last_in and _last_out so that we time heartbeats from the start of the connection if no data is ever sent or received. - Call self.update_status in Driver.timeout to detect connection closed due to heartbeat timeout (rather than a readable or writeable event.) Make update_status a no-op if engine or transport are not yet set up. - Don't consider reconnect complete in connect(), wait till we get the open-ok. See the comment on Driver._check_retry_ok() git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1556971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging/driver.py55
1 files changed, 37 insertions, 18 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 06bbe610b8..43c42273ea 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -362,11 +362,11 @@ class Driver:
[(u.host, default(u.port, 5672)) for u in urls]
if self._host >= len(hosts):
self._host = 0
- result = hosts[self._host]
+ self._last_host = hosts[self._host]
if self._host == 0:
self._attempts += 1
self._host = self._host + 1
- return result
+ return self._last_host
def _num_hosts(self):
return len(self.connection.reconnect_urls) + 1
@@ -401,6 +401,24 @@ class Driver:
def timing(self):
return self._timeout
+ def _check_retry_ok(self):
+ """We consider a reconnect to have suceeded only when we have received
+ open-ok from the peer.
+
+ If we declared success as soon as the transport connected, then we could get
+ into an infinite heartbeat loop if the remote process is hung and never
+ sends us any data. We would fail the connection after 2 missed heartbeats,
+ reconnect the transport, declare the reconnect ok, then fail again after 2
+ missed heartbeats and so on.
+ """
+ if self._retrying and self.engine._connected: # Means we have received open-ok.
+ if self._reconnect_log:
+ log.warn("reconnect succeeded: %s:%s", *self._last_host)
+ self._next_retry = None
+ self._attempts = 0
+ self._delay = self.connection.reconnect_interval_min
+ self._retrying = False
+
@synchronized
def readable(self):
try:
@@ -410,6 +428,7 @@ class Driver:
elif data:
rawlog.debug("READ[%s]: %r", self.log_id, data)
self.engine.write(data)
+ self._check_retry_ok()
else:
self.close_engine()
except socket.error, e:
@@ -451,13 +470,14 @@ class Driver:
self.schedule()
def update_status(self):
+ if not self.engine: return False
status = self.engine.status()
return getattr(self, "st_%s" % status.lower())()
def st_closed(self):
# XXX: this log statement seems to sometimes hit when the socket is not connected
# XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
- self._transport.close()
+ if self._transport: self._transport.close()
self._transport = None
self.engine = None
return True
@@ -483,6 +503,7 @@ class Driver:
@synchronized
def timeout(self):
self.dispatch()
+ self.update_status()
self._notify()
self.schedule()
@@ -531,12 +552,6 @@ class Driver:
self._transport = trans(self.connection, host, port)
else:
raise ConnectError("no such transport: %s" % self.connection.transport)
- if self._retrying and self._reconnect_log:
- log.warn("reconnect succeeded: %s:%s", host, port)
- self._next_retry = None
- self._attempts = 0
- self._delay = self.connection.reconnect_interval_min
- self._retrying = False
self.schedule()
except socket.error, e:
self.close_engine(ConnectError(text=str(e)))
@@ -589,8 +604,10 @@ class Engine:
self._status = CLOSED
self._buf = ""
self._hdr = ""
- self._last_in = None
- self._last_out = None
+ # Set _last_in and _last_out here so heartbeats will be timed from the
+ # beginning of connection if no data is sent/received.
+ self._last_in = time.time()
+ self._last_out = time.time()
self._op_enc = OpEncoder()
self._seg_enc = SegmentEncoder()
self._frame_enc = FrameEncoder()
@@ -813,13 +830,15 @@ class Engine:
self.attach(ssn)
self.process(ssn)
- if self.connection.heartbeat and self._status != CLOSED:
- now = time.time()
- if self._last_in is not None and \
- now - self._last_in > 2*self.connection.heartbeat:
- raise HeartbeatTimeout(text="heartbeat timeout")
- if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
- self.write_op(ConnectionHeartbeat())
+ # We need to check heartbeat even if not self._connected since we may have
+ # heartbeat timeout before receiving an open-ok
+ if self.connection.heartbeat and self._status != CLOSED and not self._closing:
+ now = time.time()
+ if now - self._last_in > 2*self.connection.heartbeat:
+ raise HeartbeatTimeout(text="heartbeat timeout")
+ # Only send heartbeats if we are connected.
+ if self._connected and now - self._last_out >= self.connection.heartbeat/2.0:
+ self.write_op(ConnectionHeartbeat())
def open(self):
self._reset()