diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-02-25 20:09:27 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-25 20:09:27 +0000 |
commit | 6d396d0d6e968291d844077c0028b0a4650dfe40 (patch) | |
tree | 189d01cb227eb4db2219d9731a9a6ac6b157c668 | |
parent | a9e5b38bd87e2be022914662f5bb47430ce69d18 (diff) | |
download | qpid-python-6d396d0d6e968291d844077c0028b0a4650dfe40.tar.gz |
move reconnect logic away from engine portion of the driver
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@916433 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/messaging/driver.py | 125 |
1 files changed, 81 insertions, 44 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 820f1ef7f3..a55ba3c360 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -283,6 +283,11 @@ EMPTY_MP = MessageProperties() SUBJECT = "qpid.subject" TO = "qpid.to" +CLOSED = "CLOSED" +READ_ONLY = "READ_ONLY" +WRITE_ONLY = "WRITE_ONLY" +OPEN = "OPEN" + class Driver: def __init__(self, connection): @@ -299,6 +304,7 @@ class Driver: self.connection.backups self._host = 0 self._retrying = False + self._socket = None self.reset() @@ -315,7 +321,7 @@ class Driver: self.address_cache = Cache(options.get("address_ttl", 60)) - self._socket = None + self._engine_status = CLOSED self._buf = "" self._hdr = "" self._op_enc = OpEncoder() @@ -382,20 +388,60 @@ class Driver: rawlog.debug("READ[%s]: %r", self.log_id, data) self.engine_write(data) else: - rawlog.debug("CLOSED[%s]: %s", self.log_id, self._socket.getpeername()) - self.engine_close() + self.close_engine() except socket.error, e: - self.engine_close(e) + self.close_engine(e) + + self.update_status() + self.connection._waiter.notifyAll() + def close_engine(self, e=None): + if e is None: + e = "connection aborted" + + if (recoverable and self.connection.reconnect and + (self.connection.reconnect_limit is None or + self.connection.reconnect_limit <= 0 or + self._attempts <= self.connection.reconnect_limit)): + if self._host > 0: + delay = 0 + else: + delay = self.connection.reconnect_delay + self._timeout = time.time() + delay + log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err)) + if delay > 0: + log.warn("sleeping %s seconds" % delay) + self._retrying = True + self.engine_close() + else: + self.engine_close(e) + + def update_status(self): + status = self.engine_status() + return getattr(self, "st_%s" % status.lower())() + + def st_closed(self): + rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) + self._socket.close() + self._socket = None + return True + + def st_open(self): + return False + @synchronized def writeable(self): + notify = False try: n = self._socket.send(self.engine_peek()) sent = self.engine_read(n) rawlog.debug("SENT[%s]: %r", self.log_id, sent) except socket.error, e: - self.engine_close(e) + self.close_engine(e) + notify = True + + if self.update_status() or notify: self.connection._waiter.notifyAll() @synchronized @@ -403,6 +449,9 @@ class Driver: self.dispatch() self.connection._waiter.notifyAll() + def engine_status(self): + return self._engine_status + def engine_write(self, data): try: if self._sasl_decode: @@ -423,17 +472,16 @@ class Driver: self.assign_id(op) opslog.debug("RCVD[%s]: %r", self.log_id, op) op.dispatch(self) - self.dispatch() + self.engine_dispatch() except VersionError, e: - self._error(e, False) + self.engine_close(e) except: - self._error(compat.format_exc(), False) + self.engine_close(compat.format_exc()) def engine_close(self, e=None): - if e is None: - self._error("connection aborted", True) - else: - self._error(e, True) + self.reset() + if e: + self.connection.error = (e,) def assign_id(self, op): if isinstance(op, Command): @@ -452,26 +500,6 @@ class Driver: def engine_peek(self): return self._buf - def _error(self, err, recoverable): - if self._socket is not None: - self._socket.close() - self.reset() - if (recoverable and self.connection.reconnect and - (self.connection.reconnect_limit is None or - self.connection.reconnect_limit <= 0 or - self._attempts <= self.connection.reconnect_limit)): - if self._host > 0: - delay = 0 - else: - delay = self.connection.reconnect_delay - self._timeout = time.time() + delay - log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err)) - if delay > 0: - log.warn("sleeping %s seconds" % delay) - self._retrying = True - else: - self.connection.error = (err,) - def write_op(self, op): opslog.debug("SENT[%s]: %r", self.log_id, op) self._op_enc.write(op) @@ -530,7 +558,6 @@ class Driver: # probably the right thing to do def do_connection_close_ok(self, close_ok): - self._socket.close() self.reset() def do_session_attached(self, atc): @@ -585,19 +612,25 @@ class Driver: def dispatch(self): try: - if self._socket is None and self.connection._connected: - self.connect() - elif self._socket is not None and not self.connection._connected and not self._closing: - self.disconnect() - - if self._connected and not self._closing: - for ssn in self.connection.sessions.values(): - self.attach(ssn) - self.process(ssn) + if self._socket is None: + if self.connection._connected: + self.connect() + else: + self.engine_dispatch() except: + # XXX: Does socket get leaked if this occurs? msg = compat.format_exc() self.connection.error = (msg,) + def engine_dispatch(self): + if not self.connection._connected and not self._closing and self._engine_status != CLOSED: + self.disconnect() + + if self._connected and not self._closing: + for ssn in self.connection.sessions.values(): + self.attach(ssn) + self.process(ssn) + def connect(self): try: # XXX: should make this non blocking @@ -613,10 +646,14 @@ class Driver: self._attempts = 0 self._host = 0 self._retrying = False - self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) + self.engine_open() except socket.error, e: self._host = (self._host + 1) % len(self._hosts) - self._error(e, True) + self.close_engine(e) + + def engine_open(self): + self._engine_status = OPEN + self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) def disconnect(self): self.write_op(ConnectionClose(close_code.normal)) |