summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-25 20:09:27 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-25 20:09:27 +0000
commit6d396d0d6e968291d844077c0028b0a4650dfe40 (patch)
tree189d01cb227eb4db2219d9731a9a6ac6b157c668
parenta9e5b38bd87e2be022914662f5bb47430ce69d18 (diff)
downloadqpid-python-6d396d0d6e968291d844077c0028b0a4650dfe40.tar.gz
move reconnect logic away from engine portion of the driver
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@916433 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/messaging/driver.py125
1 files changed, 81 insertions, 44 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index 820f1ef7f3..a55ba3c360 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -283,6 +283,11 @@ EMPTY_MP = MessageProperties()
SUBJECT = "qpid.subject"
TO = "qpid.to"
+CLOSED = "CLOSED"
+READ_ONLY = "READ_ONLY"
+WRITE_ONLY = "WRITE_ONLY"
+OPEN = "OPEN"
+
class Driver:
def __init__(self, connection):
@@ -299,6 +304,7 @@ class Driver:
self.connection.backups
self._host = 0
self._retrying = False
+ self._socket = None
self.reset()
@@ -315,7 +321,7 @@ class Driver:
self.address_cache = Cache(options.get("address_ttl", 60))
- self._socket = None
+ self._engine_status = CLOSED
self._buf = ""
self._hdr = ""
self._op_enc = OpEncoder()
@@ -382,20 +388,60 @@ class Driver:
rawlog.debug("READ[%s]: %r", self.log_id, data)
self.engine_write(data)
else:
- rawlog.debug("CLOSED[%s]: %s", self.log_id, self._socket.getpeername())
- self.engine_close()
+ self.close_engine()
except socket.error, e:
- self.engine_close(e)
+ self.close_engine(e)
+
+ self.update_status()
+
self.connection._waiter.notifyAll()
+ def close_engine(self, e=None):
+ if e is None:
+ e = "connection aborted"
+
+ if (recoverable and self.connection.reconnect and
+ (self.connection.reconnect_limit is None or
+ self.connection.reconnect_limit <= 0 or
+ self._attempts <= self.connection.reconnect_limit)):
+ if self._host > 0:
+ delay = 0
+ else:
+ delay = self.connection.reconnect_delay
+ self._timeout = time.time() + delay
+ log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
+ if delay > 0:
+ log.warn("sleeping %s seconds" % delay)
+ self._retrying = True
+ self.engine_close()
+ else:
+ self.engine_close(e)
+
+ def update_status(self):
+ status = self.engine_status()
+ return getattr(self, "st_%s" % status.lower())()
+
+ def st_closed(self):
+ rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
+ self._socket.close()
+ self._socket = None
+ return True
+
+ def st_open(self):
+ return False
+
@synchronized
def writeable(self):
+ notify = False
try:
n = self._socket.send(self.engine_peek())
sent = self.engine_read(n)
rawlog.debug("SENT[%s]: %r", self.log_id, sent)
except socket.error, e:
- self.engine_close(e)
+ self.close_engine(e)
+ notify = True
+
+ if self.update_status() or notify:
self.connection._waiter.notifyAll()
@synchronized
@@ -403,6 +449,9 @@ class Driver:
self.dispatch()
self.connection._waiter.notifyAll()
+ def engine_status(self):
+ return self._engine_status
+
def engine_write(self, data):
try:
if self._sasl_decode:
@@ -423,17 +472,16 @@ class Driver:
self.assign_id(op)
opslog.debug("RCVD[%s]: %r", self.log_id, op)
op.dispatch(self)
- self.dispatch()
+ self.engine_dispatch()
except VersionError, e:
- self._error(e, False)
+ self.engine_close(e)
except:
- self._error(compat.format_exc(), False)
+ self.engine_close(compat.format_exc())
def engine_close(self, e=None):
- if e is None:
- self._error("connection aborted", True)
- else:
- self._error(e, True)
+ self.reset()
+ if e:
+ self.connection.error = (e,)
def assign_id(self, op):
if isinstance(op, Command):
@@ -452,26 +500,6 @@ class Driver:
def engine_peek(self):
return self._buf
- def _error(self, err, recoverable):
- if self._socket is not None:
- self._socket.close()
- self.reset()
- if (recoverable and self.connection.reconnect and
- (self.connection.reconnect_limit is None or
- self.connection.reconnect_limit <= 0 or
- self._attempts <= self.connection.reconnect_limit)):
- if self._host > 0:
- delay = 0
- else:
- delay = self.connection.reconnect_delay
- self._timeout = time.time() + delay
- log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
- if delay > 0:
- log.warn("sleeping %s seconds" % delay)
- self._retrying = True
- else:
- self.connection.error = (err,)
-
def write_op(self, op):
opslog.debug("SENT[%s]: %r", self.log_id, op)
self._op_enc.write(op)
@@ -530,7 +558,6 @@ class Driver:
# probably the right thing to do
def do_connection_close_ok(self, close_ok):
- self._socket.close()
self.reset()
def do_session_attached(self, atc):
@@ -585,19 +612,25 @@ class Driver:
def dispatch(self):
try:
- if self._socket is None and self.connection._connected:
- self.connect()
- elif self._socket is not None and not self.connection._connected and not self._closing:
- self.disconnect()
-
- if self._connected and not self._closing:
- for ssn in self.connection.sessions.values():
- self.attach(ssn)
- self.process(ssn)
+ if self._socket is None:
+ if self.connection._connected:
+ self.connect()
+ else:
+ self.engine_dispatch()
except:
+ # XXX: Does socket get leaked if this occurs?
msg = compat.format_exc()
self.connection.error = (msg,)
+ def engine_dispatch(self):
+ if not self.connection._connected and not self._closing and self._engine_status != CLOSED:
+ self.disconnect()
+
+ if self._connected and not self._closing:
+ for ssn in self.connection.sessions.values():
+ self.attach(ssn)
+ self.process(ssn)
+
def connect(self):
try:
# XXX: should make this non blocking
@@ -613,10 +646,14 @@ class Driver:
self._attempts = 0
self._host = 0
self._retrying = False
- self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+ self.engine_open()
except socket.error, e:
self._host = (self._host + 1) % len(self._hosts)
- self._error(e, True)
+ self.close_engine(e)
+
+ def engine_open(self):
+ self._engine_status = OPEN
+ self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
def disconnect(self):
self.write_op(ConnectionClose(close_code.normal))