diff options
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r-- | python/qpid/messaging/driver.py | 41 |
1 files changed, 37 insertions, 4 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 9b34a46e19..567871b96f 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -339,6 +339,7 @@ class Driver: self._reconnect_log = self.connection.reconnect_log self._host = 0 self._retrying = False + self._next_retry = None self._transport = None self._timeout = None @@ -427,7 +428,7 @@ class Driver: delay = self._delay self._delay = min(2*self._delay, self.connection.reconnect_interval_max) - self._timeout = time.time() + delay + self._next_retry = time.time() + delay if self._reconnect_log: log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) if delay > 0: @@ -437,6 +438,8 @@ class Driver: else: self.engine.close(e) + self.schedule() + def update_status(self): status = self.engine.status() return getattr(self, "st_%s" % status.lower())() @@ -471,6 +474,18 @@ class Driver: def timeout(self): self.dispatch() self._notify() + self.schedule() + + def schedule(self): + times = [] + if self.connection.heartbeat: + times.append(time.time() + self.connection.heartbeat) + if self._next_retry: + times.append(self._next_retry) + if times: + self._timeout = min(times) + else: + self._timeout = None def dispatch(self): try: @@ -479,12 +494,17 @@ class Driver: self.connect() else: self.engine.dispatch() + except HeartbeatTimeout, e: + self.close_engine(e) except: # XXX: Does socket get leaked if this occurs? msg = compat.format_exc() self.connection.error = InternalError(text=msg) def connect(self): + if self._retrying and time.time() < self._next_retry: + return + try: # XXX: should make this non blocking host, port = self._next_host() @@ -500,11 +520,12 @@ class Driver: raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying and self._reconnect_log: log.warn("reconnect succeeded: %s:%s", host, port) - self._timeout = None + self._next_retry = None self._attempts = 0 self._host = 0 self._delay = self.connection.reconnect_interval_min self._retrying = False + self.schedule() except socket.error, e: self.close_engine(ConnectError(text=str(e))) @@ -556,6 +577,8 @@ class Engine: self._status = CLOSED self._buf = "" self._hdr = "" + self._last_in = None + self._last_out = None self._op_enc = OpEncoder() self._seg_enc = SegmentEncoder() self._frame_enc = FrameEncoder() @@ -595,6 +618,7 @@ class Engine: return self._status def write(self, data): + self._last_in = time.time() try: if self._sasl_decode: data = self._sasl.decode(data) @@ -652,6 +676,7 @@ class Engine: if self._sasl_encode: bytes = self._sasl.encode(bytes) self._buf += bytes + self._last_out = time.time() def do_header(self, hdr): cli_major = 0; cli_minor = 10 @@ -689,8 +714,8 @@ class Engine: self._sasl_decode = True self.connection._transport_connected = True - def connection_heartbeat(self, hrt): - self.write_op(ConnectionHeartbeat()) + def do_connection_heartbeat(self, hrt): + pass def do_connection_close(self, close): self.write_op(ConnectionCloseOk()) @@ -766,6 +791,14 @@ class Engine: self.attach(ssn) self.process(ssn) + if self.connection.heartbeat and self._status != CLOSED: + now = time.time() + if self._last_in is not None and \ + now - self._last_in > 2*self.connection.heartbeat: + raise HeartbeatTimeout(text="heartbeat timeout") + if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: + self.write_op(ConnectionHeartbeat()) + def open(self): self._reset() self._status = OPEN |