summaryrefslogtreecommitdiff
path: root/python/qpid/messaging
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-08-09 11:53:25 +0000
committerRafael H. Schloming <rhs@apache.org>2010-08-09 11:53:25 +0000
commitc38983c3f75ea9213fa786d32ecc94b3c1d232ec (patch)
treeeab596059451e4ad7555e3671abbcd35816de744 /python/qpid/messaging
parent6c02b1702d59edf8fa3f4250f77af1523a0af650 (diff)
downloadqpid-python-c38983c3f75ea9213fa786d32ecc94b3c1d232ec.tar.gz
fixed heartbeating
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@983597 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r--python/qpid/messaging/driver.py41
-rw-r--r--python/qpid/messaging/exceptions.py3
2 files changed, 40 insertions, 4 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 9b34a46e19..567871b96f 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -339,6 +339,7 @@ class Driver:
self._reconnect_log = self.connection.reconnect_log
self._host = 0
self._retrying = False
+ self._next_retry = None
self._transport = None
self._timeout = None
@@ -427,7 +428,7 @@ class Driver:
delay = self._delay
self._delay = min(2*self._delay,
self.connection.reconnect_interval_max)
- self._timeout = time.time() + delay
+ self._next_retry = time.time() + delay
if self._reconnect_log:
log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
if delay > 0:
@@ -437,6 +438,8 @@ class Driver:
else:
self.engine.close(e)
+ self.schedule()
+
def update_status(self):
status = self.engine.status()
return getattr(self, "st_%s" % status.lower())()
@@ -471,6 +474,18 @@ class Driver:
def timeout(self):
self.dispatch()
self._notify()
+ self.schedule()
+
+ def schedule(self):
+ times = []
+ if self.connection.heartbeat:
+ times.append(time.time() + self.connection.heartbeat)
+ if self._next_retry:
+ times.append(self._next_retry)
+ if times:
+ self._timeout = min(times)
+ else:
+ self._timeout = None
def dispatch(self):
try:
@@ -479,12 +494,17 @@ class Driver:
self.connect()
else:
self.engine.dispatch()
+ except HeartbeatTimeout, e:
+ self.close_engine(e)
except:
# XXX: Does socket get leaked if this occurs?
msg = compat.format_exc()
self.connection.error = InternalError(text=msg)
def connect(self):
+ if self._retrying and time.time() < self._next_retry:
+ return
+
try:
# XXX: should make this non blocking
host, port = self._next_host()
@@ -500,11 +520,12 @@ class Driver:
raise ConnectError("no such transport: %s" % self.connection.transport)
if self._retrying and self._reconnect_log:
log.warn("reconnect succeeded: %s:%s", host, port)
- self._timeout = None
+ self._next_retry = None
self._attempts = 0
self._host = 0
self._delay = self.connection.reconnect_interval_min
self._retrying = False
+ self.schedule()
except socket.error, e:
self.close_engine(ConnectError(text=str(e)))
@@ -556,6 +577,8 @@ class Engine:
self._status = CLOSED
self._buf = ""
self._hdr = ""
+ self._last_in = None
+ self._last_out = None
self._op_enc = OpEncoder()
self._seg_enc = SegmentEncoder()
self._frame_enc = FrameEncoder()
@@ -595,6 +618,7 @@ class Engine:
return self._status
def write(self, data):
+ self._last_in = time.time()
try:
if self._sasl_decode:
data = self._sasl.decode(data)
@@ -652,6 +676,7 @@ class Engine:
if self._sasl_encode:
bytes = self._sasl.encode(bytes)
self._buf += bytes
+ self._last_out = time.time()
def do_header(self, hdr):
cli_major = 0; cli_minor = 10
@@ -689,8 +714,8 @@ class Engine:
self._sasl_decode = True
self.connection._transport_connected = True
- def connection_heartbeat(self, hrt):
- self.write_op(ConnectionHeartbeat())
+ def do_connection_heartbeat(self, hrt):
+ pass
def do_connection_close(self, close):
self.write_op(ConnectionCloseOk())
@@ -766,6 +791,14 @@ class Engine:
self.attach(ssn)
self.process(ssn)
+ if self.connection.heartbeat and self._status != CLOSED:
+ now = time.time()
+ if self._last_in is not None and \
+ now - self._last_in > 2*self.connection.heartbeat:
+ raise HeartbeatTimeout(text="heartbeat timeout")
+ if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
+ self.write_op(ConnectionHeartbeat())
+
def open(self):
self._reset()
self._status = OPEN
diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py
index 27bc5af035..0296d615d9 100644
--- a/python/qpid/messaging/exceptions.py
+++ b/python/qpid/messaging/exceptions.py
@@ -63,6 +63,9 @@ class AuthenticationFailure(ConnectError):
class ConnectionClosed(ConnectionError):
pass
+class HeartbeatTimeout(ConnectionError):
+ pass
+
## Session Errors
class SessionError(MessagingError):