diff options
author | Ken Giusti <kgiusti@apache.org> | 2016-02-16 19:54:43 +0000 |
---|---|---|
committer | Ken Giusti <kgiusti@apache.org> | 2016-02-16 19:54:43 +0000 |
commit | a411e1bb0ce518c244d4866c68fbe37ea99a3454 (patch) | |
tree | 32fbfa410bb0e46c69ab93a59baca5bf46e9b2cd /qpid | |
parent | 4da5242913b3bb6c2fc478bc18fe8dc13fa74833 (diff) | |
download | qpid-python-a411e1bb0ce518c244d4866c68fbe37ea99a3454.tar.gz |
QPID-7053: Add a callback that is invoked when asynchronous errors are
detected by the background thread.
QPID-7064: Improve documentation of the asynchronous error callbacks.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1730741 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/python/qpid/messaging/driver.py | 2 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 117 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 93 | ||||
-rwxr-xr-x | qpid/python/setup.py | 2 |
4 files changed, 190 insertions, 24 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index ad29b5fafc..146b8188ab 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -1404,7 +1404,7 @@ class Engine: assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) rcv.received += 1 log.debug("RCVD[%s]: %s", ssn.log_id, msg) - ssn.message_received(msg) + ssn._notify_message_received(msg) def _decode(self, xfr): diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 2797677b1d..885b95a814 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -18,7 +18,7 @@ # """ -A candidate high level messaging API for python. +A high level messaging API for python. Areas that still need work: @@ -44,13 +44,57 @@ log = getLogger("qpid.messaging") static = staticmethod -class Endpoint: +class Endpoint(object): + """ + Base class for all endpoint objects types. + @undocumented: __init__, __setattr__ + """ + def __init__(self): + self._async_exception_notify_handler = None + self.error = None def _ecwait(self, predicate, timeout=None): result = self._ewait(lambda: self.closed or predicate(), timeout) self.check_closed() return result + @synchronized + def set_async_exception_notify_handler(self, handler): + """ + Register a callable that will be invoked when the driver thread detects an + error on the Endpoint. The callable is invoked with the instance of the + Endpoint object passed as the first argument. The second argument is an + Exception instance describing the failure. + + @param handler: invoked by the driver thread when an error occurs. + @type handler: callable object taking an Endpoint and an Exception as + arguments. + @return: None + @note: The exception will also be raised the next time the application + invokes one of the blocking messaging APIs. + @warning: B{Use with caution} This callback is invoked in the context of + the driver thread. It is B{NOT} safe to call B{ANY} of the messaging APIs + from within this callback. This includes any of the Endpoint's methods. The + intent of the handler is to provide an efficient way to notify the + application that an exception has occurred in the driver thread. This can + be useful for those applications that periodically poll the messaging layer + for events. In this case the callback can be used to schedule a task that + retrieves the error using the Endpoint's get_error() or check_error() + methods. + """ + self._async_exception_notify_handler = handler + + def __setattr__(self, name, value): + """ + Intercept any attempt to set the endpoint error flag and invoke the + callback if registered. + """ + super(Endpoint, self).__setattr__(name, value) + if name == 'error' and value is not None: + if self._async_exception_notify_handler: + self._async_exception_notify_handler(self, value) + + class Connection(Endpoint): """ @@ -129,6 +173,7 @@ class Connection(Endpoint): @rtype: Connection @return: a disconnected Connection """ + super(Connection, self).__init__() # List of all attributes opt_keys = ['host', 'transport', 'port', 'heartbeat', 'username', 'password', 'sasl_mechanisms', 'sasl_service', 'sasl_min_ssf', 'sasl_max_ssf', 'reconnect', 'reconnect_timeout', 'reconnect_interval', 'reconnect_interval_min', 'reconnect_interval_max', 'reconnect_limit', 'reconnect_urls', 'reconnect_log', 'address_ttl', 'tcp_nodelay', 'ssl_keyfile', 'ssl_certfile', 'ssl_trustfile', 'ssl_skip_hostname_check', 'client_properties', 'protocol' ] # Create all attributes on self and set to None. @@ -200,7 +245,6 @@ class Connection(Endpoint): self._condition = Condition(self._lock) self._waiter = Waiter(self._condition) self._modcount = Serial(0) - self.error = None from driver import Driver self._driver = Driver(self) @@ -343,6 +387,7 @@ class Connection(Endpoint): self.detach(timeout=timeout) self._open = False + class Session(Endpoint): """ @@ -538,6 +583,7 @@ class Session(Endpoint): """ def __init__(self, connection, name, transactional): + super(Session, self).__init__() self.connection = connection self.name = name self.log_id = "%x" % id(self) @@ -560,12 +606,11 @@ class Session(Endpoint): # XXX: I hate this name. self.ack_capacity = UNLIMITED - self.error = None self.closing = False self.closed = False self._lock = connection._lock - self._msg_received = None + self._msg_received_notify_handler = None def __repr__(self): return "<Session %s>" % self.name @@ -597,10 +642,15 @@ class Session(Endpoint): if self.closed: raise SessionClosed() - def message_received(self, msg): + def _notify_message_received(self, msg): self.incoming.append(msg) - if self._msg_received: - self._msg_received() + if self._msg_received_notify_handler: + try: + # new callback parameter: the Session + self._msg_received_notify_handler(self) + except TypeError: + # backward compatibility with old API, no Session + self._msg_received_notify_handler() @synchronized def sender(self, target, **options): @@ -687,16 +737,40 @@ class Session(Endpoint): return None @synchronized + def set_message_received_notify_handler(self, handler): + """ + Register a callable that will be invoked when a Message arrives on the + Session. + + @param handler: invoked by the driver thread when an error occurs. + @type handler: a callable object taking a Session instance as its only + argument + @return: None + + @note: When using this method it is recommended to also register + asynchronous error callbacks on all endpoint objects. Doing so will cause + the application to be notified if an error is raised by the driver + thread. This is necessary as after a driver error occurs the message received + callback may never be invoked again. See + L{Endpoint.set_async_exception_notify_handler} + + @warning: B{Use with caution} This callback is invoked in the context of + the driver thread. It is B{NOT} safe to call B{ANY} of the public + messaging APIs from within this callback, including any of the passed + Session's methods. The intent of the handler is to provide an efficient + way to notify the application that a message has arrived. This can be + useful for those applications that need to schedule a task to poll for + received messages without blocking in the messaging API. The scheduled + task may then retrieve the message using L{next_receiver} and + L{Receiver.fetch} + """ + self._msg_received_notify_handler = handler + + @synchronized def set_message_received_handler(self, handler): - """Register a callback that will be invoked when a message arrives on the - session. Use with caution: since this callback is invoked in the context - of the driver thread, it is not safe to call any of the public messaging - APIs from within this callback. The intent of the handler is to provide - an efficient way to notify the application that a message has arrived. - This can be useful for those applications that need to schedule a task - to poll for received messages without blocking in the messaging API. + """@deprecated: Use L{set_message_received_notify_handler} instead. """ - self._msg_received = handler + self._msg_received_notify_handler = handler @synchronized def next_receiver(self, timeout=None): @@ -803,6 +877,7 @@ class Session(Endpoint): finally: self.connection._remove_session(self) + class MangledString(str): pass def _mangle(addr): @@ -818,6 +893,7 @@ class Sender(Endpoint): """ def __init__(self, session, id, target, options): + super(Sender, self).__init__() self.session = session self.id = id self.target = target @@ -828,7 +904,6 @@ class Sender(Endpoint): self.queued = Serial(0) self.synced = Serial(0) self.acked = Serial(0) - self.error = None self.linked = False self.closing = False self.closed = False @@ -968,7 +1043,8 @@ class Sender(Endpoint): except ValueError: pass -class Receiver(Endpoint, object): + +class Receiver(Endpoint): """ Receives incoming messages from a remote source. Messages may be @@ -976,6 +1052,7 @@ class Receiver(Endpoint, object): """ def __init__(self, session, id, source, options): + super(Receiver, self).__init__() self.session = session self.id = id self.source = source @@ -987,7 +1064,6 @@ class Receiver(Endpoint, object): self.received = Serial(0) self.returned = Serial(0) - self.error = None self.linked = False self.closing = False self.closed = False @@ -1115,4 +1191,5 @@ class Receiver(Endpoint, object): except ValueError: pass -__all__ = ["Connection", "Session", "Sender", "Receiver"] + +__all__ = ["Connection", "Endpoint", "Session", "Sender", "Receiver"] diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index d0103ee32c..ce2b4181a2 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -667,10 +667,11 @@ class SessionTests(Base): class CallbackHandler: def __init__(self): self.handler_called = False - def __call__(self): + def __call__(self, ssn): self.handler_called = True + self.ssn = ssn cb = CallbackHandler() - self.ssn.set_message_received_handler(cb) + self.ssn.set_message_received_notify_handler(cb) rcv = self.ssn.receiver(ADDR) rcv.capacity = UNLIMITED snd = self.ssn.sender(ADDR) @@ -681,6 +682,7 @@ class SessionTests(Base): if cb.handler_called: break; assert cb.handler_called + assert cb.ssn == self.ssn snd.close() rcv.close() @@ -1385,3 +1387,90 @@ class SenderTests(Base): except: pass self.snd.send(m2, timeout=self.timeout()) + + +class ErrorCallbackTests(Base): + + class Callback: + def __init__(self, name): + self.name = name + self.obj = None + self.exc = None + + def __call__(self, obj, exc): + self.obj = obj + self.exc = exc + + def testConnectErrorCallback(self): + cb = ErrorCallbackTests.Callback("connection") + self.conn = Connection("localhost:4") + self.conn.set_async_exception_notify_handler(cb) + try: + self.conn.open() + assert False, "connect succeeded" + except Exception: + assert self.conn == cb.obj, cb.obj + assert cb.name == "connection" + assert cb.exc is not None + + def testSessionErrorCallback(self): + ccb = ErrorCallbackTests.Callback("connection") + self.conn = Connection.establish(self.broker, **self.connection_options()) + self.conn.set_async_exception_notify_handler(ccb) + scb = ErrorCallbackTests.Callback("session") + self.ssn = self.conn.session(transactional=True) + self.ssn.set_async_exception_notify_handler(scb) + self.conn.detach() + try: + self.ping(self.ssn) + assert False, "session succeeded" + except Exception: + assert self.ssn == scb.obj, scb.obj + assert scb.name == "session" + assert scb.exc is not None + # connection callback should be empty + assert ccb.obj == None, ccb.obj + + def testSenderErrorCallback(self): + ccb = ErrorCallbackTests.Callback("connection") + conn = Connection(self.broker, **self.connection_options()) + conn.set_async_exception_notify_handler(ccb) + scb = ErrorCallbackTests.Callback("session") + ssn = conn.session() + ssn.set_async_exception_notify_handler(scb) + snd = ssn.sender(NOSUCH_Q) + sndcb = ErrorCallbackTests.Callback("sender") + snd.set_async_exception_notify_handler(sndcb) + conn.open() + try: + snd.send(self.message("HI")) + assert False, "send worked" + except Exception: + assert snd == sndcb.obj, sndcb.obj + assert sndcb.name == "sender" + assert sndcb.exc is not None + # connection and session callbacks are empty + assert ccb.obj == None, ccb.obj + assert scb.obj == None, scb.obj + + def testReceiverErrorCallback(self): + ccb = ErrorCallbackTests.Callback("connection") + self.conn = Connection(self.broker, **self.connection_options()) + self.conn.set_async_exception_notify_handler(ccb) + scb = ErrorCallbackTests.Callback("session") + self.ssn = self.conn.session() + self.ssn.set_async_exception_notify_handler(scb) + self.recv = self.ssn.receiver(NOSUCH_Q) + rcb = ErrorCallbackTests.Callback("receiver") + self.recv.set_async_exception_notify_handler(rcb) + self.conn.open() + try: + self.recv.fetch() + assert False, "fetch worked" + except Exception: + assert self.recv == rcb.obj, rcb.obj + assert rcb.name == "receiver" + assert rcb.exc is not None + # connection and session callbacks are empty + assert ccb.obj == None, ccb.obj + assert scb.obj == None, scb.obj diff --git a/qpid/python/setup.py b/qpid/python/setup.py index e8e07d9d81..b85ca12b0c 100755 --- a/qpid/python/setup.py +++ b/qpid/python/setup.py @@ -135,7 +135,7 @@ class build_doc(Command): names = ["qpid.messaging"] doc_index = build_doc_index(names, True, True) - html_writer = HTMLWriter(doc_index) + html_writer = HTMLWriter(doc_index, show_private=False) self.mkpath(self.build_doc) log.info('epydoc %s to %s' % (", ".join(names), self.build_doc)) html_writer.write(self.build_doc) |