diff options
author | Ken Giusti <kgiusti@apache.org> | 2015-11-11 22:45:42 +0000 |
---|---|---|
committer | Ken Giusti <kgiusti@apache.org> | 2015-11-11 22:45:42 +0000 |
commit | 11368ef1a01233f253eb9eadbadaa9cb9b8465f3 (patch) | |
tree | 74853982585adb7363d90ebb751dc5d6c656f384 | |
parent | 92796c746c190d240363723bbc8fc851a920c31c (diff) | |
download | qpid-python-11368ef1a01233f253eb9eadbadaa9cb9b8465f3.tar.gz |
QPID-6839: python-qpid: Log the failure of the Selector thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1713943 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/messaging/driver.py | 19 | ||||
-rw-r--r-- | qpid/python/qpid/selector.py | 98 |
2 files changed, 75 insertions, 42 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 74ed038b0e..ad29b5fafc 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -390,18 +390,37 @@ class Driver: @synchronized def reading(self): + """Called by the Selector I/O thread to determine if the driver needs to + wait on the arrival of network data (call self.readable() callback) + """ return self._transport is not None and \ self._transport.reading(True) @synchronized def writing(self): + """Called by the Selector I/O thread to determine if it should block + waiting for output bandwidth (call the self.writeable() callback) + """ return self._transport is not None and \ self._transport.writing(self.engine.pending()) @synchronized def timing(self): + """Called by the Selector I/O thread to determine if it should wake up the + driver (call the timeout() callback + """ return self._timeout + @synchronized + def abort(self, exc, info): + """Called if the Selector I/O thread hits an unrecoverable error and fails. + """ + try: + self.connection.error = exc + log.error("I/O Thread Fatal error: %s\n%s" % (str(exc), info)) + except: + pass + def _check_retry_ok(self): """We consider a reconnect to have suceeded only when we have received open-ok from the peer. diff --git a/qpid/python/qpid/selector.py b/qpid/python/qpid/selector.py index 719dd84893..4a15d56662 100644 --- a/qpid/python/qpid/selector.py +++ b/qpid/python/qpid/selector.py @@ -17,8 +17,11 @@ # under the License. # import atexit, time, errno, os -from compat import select, SelectError, set, selectable_waiter +from compat import select, SelectError, set, selectable_waiter, format_exc from threading import Thread, Lock +from logging import getLogger + +log = getLogger("qpid.messaging") class Acceptor: @@ -67,6 +70,7 @@ class Selector: self.reading.add(self.waiter) self.stopped = False self.thread = None + self.exception = None def wakeup(self): self.waiter.wakeup() @@ -103,48 +107,58 @@ class Selector: self.thread.start(); def run(self): - while not self.stopped: - wakeup = None - for sel in self.selectables.copy(): - t = self._update(sel) - if t is not None: - if wakeup is None: - wakeup = t - else: - wakeup = min(wakeup, t) - - rd = [] - wr = [] - ex = [] - - while True: - try: - if wakeup is None: - timeout = None - else: - timeout = max(0, wakeup - time.time()) - rd, wr, ex = select(self.reading, self.writing, (), timeout) - break - except SelectError, e: - # Repeat the select call if we were interrupted. - if e[0] == errno.EINTR: - continue - else: - raise - - for sel in wr: - if sel.writing(): - sel.writeable() - - for sel in rd: - if sel.reading(): - sel.readable() - - now = time.time() + try: + while not self.stopped: + wakeup = None + for sel in self.selectables.copy(): + t = self._update(sel) + if t is not None: + if wakeup is None: + wakeup = t + else: + wakeup = min(wakeup, t) + + rd = [] + wr = [] + ex = [] + + while True: + try: + if wakeup is None: + timeout = None + else: + timeout = max(0, wakeup - time.time()) + rd, wr, ex = select(self.reading, self.writing, (), timeout) + break + except SelectError, e: + # Repeat the select call if we were interrupted. + if e[0] == errno.EINTR: + continue + else: + # unrecoverable: promote to outer try block + raise + + for sel in wr: + if sel.writing(): + sel.writeable() + + for sel in rd: + if sel.reading(): + sel.readable() + + now = time.time() + for sel in self.selectables.copy(): + w = sel.timing() + if w is not None and now > w: + sel.timeout() + except Exception, e: + self.exception = e + info = format_exc() + log.error("qpid.messaging I/O thread has died: %s" % str(e)) for sel in self.selectables.copy(): - w = sel.timing() - if w is not None and now > w: - sel.timeout() + if hasattr(sel, "abort"): + sel.abort(e, info) + raise def stop(self, timeout=None): self.stopped = True |