diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-08-09 11:53:25 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-08-09 11:53:25 +0000 |
commit | c38983c3f75ea9213fa786d32ecc94b3c1d232ec (patch) | |
tree | eab596059451e4ad7555e3671abbcd35816de744 /python/qpid/messaging | |
parent | 6c02b1702d59edf8fa3f4250f77af1523a0af650 (diff) | |
download | qpid-python-c38983c3f75ea9213fa786d32ecc94b3c1d232ec.tar.gz |
fixed heartbeating
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@983597 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r-- | python/qpid/messaging/driver.py | 41 | ||||
-rw-r--r-- | python/qpid/messaging/exceptions.py | 3 |
2 files changed, 40 insertions, 4 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 9b34a46e19..567871b96f 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -339,6 +339,7 @@ class Driver: self._reconnect_log = self.connection.reconnect_log self._host = 0 self._retrying = False + self._next_retry = None self._transport = None self._timeout = None @@ -427,7 +428,7 @@ class Driver: delay = self._delay self._delay = min(2*self._delay, self.connection.reconnect_interval_max) - self._timeout = time.time() + delay + self._next_retry = time.time() + delay if self._reconnect_log: log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) if delay > 0: @@ -437,6 +438,8 @@ class Driver: else: self.engine.close(e) + self.schedule() + def update_status(self): status = self.engine.status() return getattr(self, "st_%s" % status.lower())() @@ -471,6 +474,18 @@ class Driver: def timeout(self): self.dispatch() self._notify() + self.schedule() + + def schedule(self): + times = [] + if self.connection.heartbeat: + times.append(time.time() + self.connection.heartbeat) + if self._next_retry: + times.append(self._next_retry) + if times: + self._timeout = min(times) + else: + self._timeout = None def dispatch(self): try: @@ -479,12 +494,17 @@ class Driver: self.connect() else: self.engine.dispatch() + except HeartbeatTimeout, e: + self.close_engine(e) except: # XXX: Does socket get leaked if this occurs? msg = compat.format_exc() self.connection.error = InternalError(text=msg) def connect(self): + if self._retrying and time.time() < self._next_retry: + return + try: # XXX: should make this non blocking host, port = self._next_host() @@ -500,11 +520,12 @@ class Driver: raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying and self._reconnect_log: log.warn("reconnect succeeded: %s:%s", host, port) - self._timeout = None + self._next_retry = None self._attempts = 0 self._host = 0 self._delay = self.connection.reconnect_interval_min self._retrying = False + self.schedule() except socket.error, e: self.close_engine(ConnectError(text=str(e))) @@ -556,6 +577,8 @@ class Engine: self._status = CLOSED self._buf = "" self._hdr = "" + self._last_in = None + self._last_out = None self._op_enc = OpEncoder() self._seg_enc = SegmentEncoder() self._frame_enc = FrameEncoder() @@ -595,6 +618,7 @@ class Engine: return self._status def write(self, data): + self._last_in = time.time() try: if self._sasl_decode: data = self._sasl.decode(data) @@ -652,6 +676,7 @@ class Engine: if self._sasl_encode: bytes = self._sasl.encode(bytes) self._buf += bytes + self._last_out = time.time() def do_header(self, hdr): cli_major = 0; cli_minor = 10 @@ -689,8 +714,8 @@ class Engine: self._sasl_decode = True self.connection._transport_connected = True - def connection_heartbeat(self, hrt): - self.write_op(ConnectionHeartbeat()) + def do_connection_heartbeat(self, hrt): + pass def do_connection_close(self, close): self.write_op(ConnectionCloseOk()) @@ -766,6 +791,14 @@ class Engine: self.attach(ssn) self.process(ssn) + if self.connection.heartbeat and self._status != CLOSED: + now = time.time() + if self._last_in is not None and \ + now - self._last_in > 2*self.connection.heartbeat: + raise HeartbeatTimeout(text="heartbeat timeout") + if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: + self.write_op(ConnectionHeartbeat()) + def open(self): self._reset() self._status = OPEN diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py index 27bc5af035..0296d615d9 100644 --- a/python/qpid/messaging/exceptions.py +++ b/python/qpid/messaging/exceptions.py @@ -63,6 +63,9 @@ class AuthenticationFailure(ConnectError): class ConnectionClosed(ConnectionError): pass +class HeartbeatTimeout(ConnectionError): + pass + ## Session Errors class SessionError(MessagingError): |