summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKen Giusti <kgiusti@apache.org>2016-02-16 19:54:43 +0000
committerKen Giusti <kgiusti@apache.org>2016-02-16 19:54:43 +0000
commita411e1bb0ce518c244d4866c68fbe37ea99a3454 (patch)
tree32fbfa410bb0e46c69ab93a59baca5bf46e9b2cd
parent4da5242913b3bb6c2fc478bc18fe8dc13fa74833 (diff)
downloadqpid-python-a411e1bb0ce518c244d4866c68fbe37ea99a3454.tar.gz
QPID-7053: Add a callback that is invoked when asynchronous errors are
detected by the background thread. QPID-7064: Improve documentation of the asynchronous error callbacks. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1730741 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/messaging/driver.py2
-rw-r--r--qpid/python/qpid/messaging/endpoints.py117
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py93
-rwxr-xr-xqpid/python/setup.py2
4 files changed, 190 insertions, 24 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index ad29b5fafc..146b8188ab 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -1404,7 +1404,7 @@ class Engine:
assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
rcv.received += 1
log.debug("RCVD[%s]: %s", ssn.log_id, msg)
- ssn.message_received(msg)
+ ssn._notify_message_received(msg)
def _decode(self, xfr):
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index 2797677b1d..885b95a814 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -18,7 +18,7 @@
#
"""
-A candidate high level messaging API for python.
+A high level messaging API for python.
Areas that still need work:
@@ -44,13 +44,57 @@ log = getLogger("qpid.messaging")
static = staticmethod
-class Endpoint:
+class Endpoint(object):
+ """
+ Base class for all endpoint objects types.
+ @undocumented: __init__, __setattr__
+ """
+ def __init__(self):
+ self._async_exception_notify_handler = None
+ self.error = None
def _ecwait(self, predicate, timeout=None):
result = self._ewait(lambda: self.closed or predicate(), timeout)
self.check_closed()
return result
+ @synchronized
+ def set_async_exception_notify_handler(self, handler):
+ """
+ Register a callable that will be invoked when the driver thread detects an
+ error on the Endpoint. The callable is invoked with the instance of the
+ Endpoint object passed as the first argument. The second argument is an
+ Exception instance describing the failure.
+
+ @param handler: invoked by the driver thread when an error occurs.
+ @type handler: callable object taking an Endpoint and an Exception as
+ arguments.
+ @return: None
+ @note: The exception will also be raised the next time the application
+ invokes one of the blocking messaging APIs.
+ @warning: B{Use with caution} This callback is invoked in the context of
+ the driver thread. It is B{NOT} safe to call B{ANY} of the messaging APIs
+ from within this callback. This includes any of the Endpoint's methods. The
+ intent of the handler is to provide an efficient way to notify the
+ application that an exception has occurred in the driver thread. This can
+ be useful for those applications that periodically poll the messaging layer
+ for events. In this case the callback can be used to schedule a task that
+ retrieves the error using the Endpoint's get_error() or check_error()
+ methods.
+ """
+ self._async_exception_notify_handler = handler
+
+ def __setattr__(self, name, value):
+ """
+ Intercept any attempt to set the endpoint error flag and invoke the
+ callback if registered.
+ """
+ super(Endpoint, self).__setattr__(name, value)
+ if name == 'error' and value is not None:
+ if self._async_exception_notify_handler:
+ self._async_exception_notify_handler(self, value)
+
+
class Connection(Endpoint):
"""
@@ -129,6 +173,7 @@ class Connection(Endpoint):
@rtype: Connection
@return: a disconnected Connection
"""
+ super(Connection, self).__init__()
# List of all attributes
opt_keys = ['host', 'transport', 'port', 'heartbeat', 'username', 'password', 'sasl_mechanisms', 'sasl_service', 'sasl_min_ssf', 'sasl_max_ssf', 'reconnect', 'reconnect_timeout', 'reconnect_interval', 'reconnect_interval_min', 'reconnect_interval_max', 'reconnect_limit', 'reconnect_urls', 'reconnect_log', 'address_ttl', 'tcp_nodelay', 'ssl_keyfile', 'ssl_certfile', 'ssl_trustfile', 'ssl_skip_hostname_check', 'client_properties', 'protocol' ]
# Create all attributes on self and set to None.
@@ -200,7 +245,6 @@ class Connection(Endpoint):
self._condition = Condition(self._lock)
self._waiter = Waiter(self._condition)
self._modcount = Serial(0)
- self.error = None
from driver import Driver
self._driver = Driver(self)
@@ -343,6 +387,7 @@ class Connection(Endpoint):
self.detach(timeout=timeout)
self._open = False
+
class Session(Endpoint):
"""
@@ -538,6 +583,7 @@ class Session(Endpoint):
"""
def __init__(self, connection, name, transactional):
+ super(Session, self).__init__()
self.connection = connection
self.name = name
self.log_id = "%x" % id(self)
@@ -560,12 +606,11 @@ class Session(Endpoint):
# XXX: I hate this name.
self.ack_capacity = UNLIMITED
- self.error = None
self.closing = False
self.closed = False
self._lock = connection._lock
- self._msg_received = None
+ self._msg_received_notify_handler = None
def __repr__(self):
return "<Session %s>" % self.name
@@ -597,10 +642,15 @@ class Session(Endpoint):
if self.closed:
raise SessionClosed()
- def message_received(self, msg):
+ def _notify_message_received(self, msg):
self.incoming.append(msg)
- if self._msg_received:
- self._msg_received()
+ if self._msg_received_notify_handler:
+ try:
+ # new callback parameter: the Session
+ self._msg_received_notify_handler(self)
+ except TypeError:
+ # backward compatibility with old API, no Session
+ self._msg_received_notify_handler()
@synchronized
def sender(self, target, **options):
@@ -687,16 +737,40 @@ class Session(Endpoint):
return None
@synchronized
+ def set_message_received_notify_handler(self, handler):
+ """
+ Register a callable that will be invoked when a Message arrives on the
+ Session.
+
+ @param handler: invoked by the driver thread when an error occurs.
+ @type handler: a callable object taking a Session instance as its only
+ argument
+ @return: None
+
+ @note: When using this method it is recommended to also register
+ asynchronous error callbacks on all endpoint objects. Doing so will cause
+ the application to be notified if an error is raised by the driver
+ thread. This is necessary as after a driver error occurs the message received
+ callback may never be invoked again. See
+ L{Endpoint.set_async_exception_notify_handler}
+
+ @warning: B{Use with caution} This callback is invoked in the context of
+ the driver thread. It is B{NOT} safe to call B{ANY} of the public
+ messaging APIs from within this callback, including any of the passed
+ Session's methods. The intent of the handler is to provide an efficient
+ way to notify the application that a message has arrived. This can be
+ useful for those applications that need to schedule a task to poll for
+ received messages without blocking in the messaging API. The scheduled
+ task may then retrieve the message using L{next_receiver} and
+ L{Receiver.fetch}
+ """
+ self._msg_received_notify_handler = handler
+
+ @synchronized
def set_message_received_handler(self, handler):
- """Register a callback that will be invoked when a message arrives on the
- session. Use with caution: since this callback is invoked in the context
- of the driver thread, it is not safe to call any of the public messaging
- APIs from within this callback. The intent of the handler is to provide
- an efficient way to notify the application that a message has arrived.
- This can be useful for those applications that need to schedule a task
- to poll for received messages without blocking in the messaging API.
+ """@deprecated: Use L{set_message_received_notify_handler} instead.
"""
- self._msg_received = handler
+ self._msg_received_notify_handler = handler
@synchronized
def next_receiver(self, timeout=None):
@@ -803,6 +877,7 @@ class Session(Endpoint):
finally:
self.connection._remove_session(self)
+
class MangledString(str): pass
def _mangle(addr):
@@ -818,6 +893,7 @@ class Sender(Endpoint):
"""
def __init__(self, session, id, target, options):
+ super(Sender, self).__init__()
self.session = session
self.id = id
self.target = target
@@ -828,7 +904,6 @@ class Sender(Endpoint):
self.queued = Serial(0)
self.synced = Serial(0)
self.acked = Serial(0)
- self.error = None
self.linked = False
self.closing = False
self.closed = False
@@ -968,7 +1043,8 @@ class Sender(Endpoint):
except ValueError:
pass
-class Receiver(Endpoint, object):
+
+class Receiver(Endpoint):
"""
Receives incoming messages from a remote source. Messages may be
@@ -976,6 +1052,7 @@ class Receiver(Endpoint, object):
"""
def __init__(self, session, id, source, options):
+ super(Receiver, self).__init__()
self.session = session
self.id = id
self.source = source
@@ -987,7 +1064,6 @@ class Receiver(Endpoint, object):
self.received = Serial(0)
self.returned = Serial(0)
- self.error = None
self.linked = False
self.closing = False
self.closed = False
@@ -1115,4 +1191,5 @@ class Receiver(Endpoint, object):
except ValueError:
pass
-__all__ = ["Connection", "Session", "Sender", "Receiver"]
+
+__all__ = ["Connection", "Endpoint", "Session", "Sender", "Receiver"]
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index d0103ee32c..ce2b4181a2 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -667,10 +667,11 @@ class SessionTests(Base):
class CallbackHandler:
def __init__(self):
self.handler_called = False
- def __call__(self):
+ def __call__(self, ssn):
self.handler_called = True
+ self.ssn = ssn
cb = CallbackHandler()
- self.ssn.set_message_received_handler(cb)
+ self.ssn.set_message_received_notify_handler(cb)
rcv = self.ssn.receiver(ADDR)
rcv.capacity = UNLIMITED
snd = self.ssn.sender(ADDR)
@@ -681,6 +682,7 @@ class SessionTests(Base):
if cb.handler_called:
break;
assert cb.handler_called
+ assert cb.ssn == self.ssn
snd.close()
rcv.close()
@@ -1385,3 +1387,90 @@ class SenderTests(Base):
except:
pass
self.snd.send(m2, timeout=self.timeout())
+
+
+class ErrorCallbackTests(Base):
+
+ class Callback:
+ def __init__(self, name):
+ self.name = name
+ self.obj = None
+ self.exc = None
+
+ def __call__(self, obj, exc):
+ self.obj = obj
+ self.exc = exc
+
+ def testConnectErrorCallback(self):
+ cb = ErrorCallbackTests.Callback("connection")
+ self.conn = Connection("localhost:4")
+ self.conn.set_async_exception_notify_handler(cb)
+ try:
+ self.conn.open()
+ assert False, "connect succeeded"
+ except Exception:
+ assert self.conn == cb.obj, cb.obj
+ assert cb.name == "connection"
+ assert cb.exc is not None
+
+ def testSessionErrorCallback(self):
+ ccb = ErrorCallbackTests.Callback("connection")
+ self.conn = Connection.establish(self.broker, **self.connection_options())
+ self.conn.set_async_exception_notify_handler(ccb)
+ scb = ErrorCallbackTests.Callback("session")
+ self.ssn = self.conn.session(transactional=True)
+ self.ssn.set_async_exception_notify_handler(scb)
+ self.conn.detach()
+ try:
+ self.ping(self.ssn)
+ assert False, "session succeeded"
+ except Exception:
+ assert self.ssn == scb.obj, scb.obj
+ assert scb.name == "session"
+ assert scb.exc is not None
+ # connection callback should be empty
+ assert ccb.obj == None, ccb.obj
+
+ def testSenderErrorCallback(self):
+ ccb = ErrorCallbackTests.Callback("connection")
+ conn = Connection(self.broker, **self.connection_options())
+ conn.set_async_exception_notify_handler(ccb)
+ scb = ErrorCallbackTests.Callback("session")
+ ssn = conn.session()
+ ssn.set_async_exception_notify_handler(scb)
+ snd = ssn.sender(NOSUCH_Q)
+ sndcb = ErrorCallbackTests.Callback("sender")
+ snd.set_async_exception_notify_handler(sndcb)
+ conn.open()
+ try:
+ snd.send(self.message("HI"))
+ assert False, "send worked"
+ except Exception:
+ assert snd == sndcb.obj, sndcb.obj
+ assert sndcb.name == "sender"
+ assert sndcb.exc is not None
+ # connection and session callbacks are empty
+ assert ccb.obj == None, ccb.obj
+ assert scb.obj == None, scb.obj
+
+ def testReceiverErrorCallback(self):
+ ccb = ErrorCallbackTests.Callback("connection")
+ self.conn = Connection(self.broker, **self.connection_options())
+ self.conn.set_async_exception_notify_handler(ccb)
+ scb = ErrorCallbackTests.Callback("session")
+ self.ssn = self.conn.session()
+ self.ssn.set_async_exception_notify_handler(scb)
+ self.recv = self.ssn.receiver(NOSUCH_Q)
+ rcb = ErrorCallbackTests.Callback("receiver")
+ self.recv.set_async_exception_notify_handler(rcb)
+ self.conn.open()
+ try:
+ self.recv.fetch()
+ assert False, "fetch worked"
+ except Exception:
+ assert self.recv == rcb.obj, rcb.obj
+ assert rcb.name == "receiver"
+ assert rcb.exc is not None
+ # connection and session callbacks are empty
+ assert ccb.obj == None, ccb.obj
+ assert scb.obj == None, scb.obj
diff --git a/qpid/python/setup.py b/qpid/python/setup.py
index e8e07d9d81..b85ca12b0c 100755
--- a/qpid/python/setup.py
+++ b/qpid/python/setup.py
@@ -135,7 +135,7 @@ class build_doc(Command):
names = ["qpid.messaging"]
doc_index = build_doc_index(names, True, True)
- html_writer = HTMLWriter(doc_index)
+ html_writer = HTMLWriter(doc_index, show_private=False)
self.mkpath(self.build_doc)
log.info('epydoc %s to %s' % (", ".join(names), self.build_doc))
html_writer.write(self.build_doc)