summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKen Giusti <kgiusti@apache.org>2015-11-11 22:45:42 +0000
committerKen Giusti <kgiusti@apache.org>2015-11-11 22:45:42 +0000
commit11368ef1a01233f253eb9eadbadaa9cb9b8465f3 (patch)
tree74853982585adb7363d90ebb751dc5d6c656f384
parent92796c746c190d240363723bbc8fc851a920c31c (diff)
downloadqpid-python-11368ef1a01233f253eb9eadbadaa9cb9b8465f3.tar.gz
QPID-6839: python-qpid: Log the failure of the Selector thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1713943 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/messaging/driver.py19
-rw-r--r--qpid/python/qpid/selector.py98
2 files changed, 75 insertions, 42 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index 74ed038b0e..ad29b5fafc 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -390,18 +390,37 @@ class Driver:
@synchronized
def reading(self):
+ """Called by the Selector I/O thread to determine if the driver needs to
+ wait on the arrival of network data (call self.readable() callback)
+ """
return self._transport is not None and \
self._transport.reading(True)
@synchronized
def writing(self):
+ """Called by the Selector I/O thread to determine if it should block
+ waiting for output bandwidth (call the self.writeable() callback)
+ """
return self._transport is not None and \
self._transport.writing(self.engine.pending())
@synchronized
def timing(self):
+ """Called by the Selector I/O thread to determine if it should wake up the
+ driver (call the timeout() callback
+ """
return self._timeout
+ @synchronized
+ def abort(self, exc, info):
+ """Called if the Selector I/O thread hits an unrecoverable error and fails.
+ """
+ try:
+ self.connection.error = exc
+ log.error("I/O Thread Fatal error: %s\n%s" % (str(exc), info))
+ except:
+ pass
+
def _check_retry_ok(self):
"""We consider a reconnect to have suceeded only when we have received
open-ok from the peer.
diff --git a/qpid/python/qpid/selector.py b/qpid/python/qpid/selector.py
index 719dd84893..4a15d56662 100644
--- a/qpid/python/qpid/selector.py
+++ b/qpid/python/qpid/selector.py
@@ -17,8 +17,11 @@
# under the License.
#
import atexit, time, errno, os
-from compat import select, SelectError, set, selectable_waiter
+from compat import select, SelectError, set, selectable_waiter, format_exc
from threading import Thread, Lock
+from logging import getLogger
+
+log = getLogger("qpid.messaging")
class Acceptor:
@@ -67,6 +70,7 @@ class Selector:
self.reading.add(self.waiter)
self.stopped = False
self.thread = None
+ self.exception = None
def wakeup(self):
self.waiter.wakeup()
@@ -103,48 +107,58 @@ class Selector:
self.thread.start();
def run(self):
- while not self.stopped:
- wakeup = None
- for sel in self.selectables.copy():
- t = self._update(sel)
- if t is not None:
- if wakeup is None:
- wakeup = t
- else:
- wakeup = min(wakeup, t)
-
- rd = []
- wr = []
- ex = []
-
- while True:
- try:
- if wakeup is None:
- timeout = None
- else:
- timeout = max(0, wakeup - time.time())
- rd, wr, ex = select(self.reading, self.writing, (), timeout)
- break
- except SelectError, e:
- # Repeat the select call if we were interrupted.
- if e[0] == errno.EINTR:
- continue
- else:
- raise
-
- for sel in wr:
- if sel.writing():
- sel.writeable()
-
- for sel in rd:
- if sel.reading():
- sel.readable()
-
- now = time.time()
+ try:
+ while not self.stopped:
+ wakeup = None
+ for sel in self.selectables.copy():
+ t = self._update(sel)
+ if t is not None:
+ if wakeup is None:
+ wakeup = t
+ else:
+ wakeup = min(wakeup, t)
+
+ rd = []
+ wr = []
+ ex = []
+
+ while True:
+ try:
+ if wakeup is None:
+ timeout = None
+ else:
+ timeout = max(0, wakeup - time.time())
+ rd, wr, ex = select(self.reading, self.writing, (), timeout)
+ break
+ except SelectError, e:
+ # Repeat the select call if we were interrupted.
+ if e[0] == errno.EINTR:
+ continue
+ else:
+ # unrecoverable: promote to outer try block
+ raise
+
+ for sel in wr:
+ if sel.writing():
+ sel.writeable()
+
+ for sel in rd:
+ if sel.reading():
+ sel.readable()
+
+ now = time.time()
+ for sel in self.selectables.copy():
+ w = sel.timing()
+ if w is not None and now > w:
+ sel.timeout()
+ except Exception, e:
+ self.exception = e
+ info = format_exc()
+ log.error("qpid.messaging I/O thread has died: %s" % str(e))
for sel in self.selectables.copy():
- w = sel.timing()
- if w is not None and now > w:
- sel.timeout()
+ if hasattr(sel, "abort"):
+ sel.abort(e, info)
+ raise
def stop(self, timeout=None):
self.stopped = True