From 036d9119982e67f5c1d784862b7001819d5f5a7f Mon Sep 17 00:00:00 2001 From: Justin Ross Date: Tue, 23 Apr 2013 21:40:49 +0000 Subject: QPID-2453: Allow the session to keep operating after content codec errors; a patch from Ernie Allen git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1471158 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qpid/messaging/driver.py | 14 ++++++++++++-- qpid/python/qpid/messaging/endpoints.py | 18 +++++++++++++++--- qpid/python/qpid/messaging/exceptions.py | 14 ++++++++++++++ qpid/python/qpid/tests/messaging/message.py | 13 +++++++++++++ 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 2bd638f327..7a36745c33 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -497,6 +497,9 @@ class Driver: self.engine.dispatch() except HeartbeatTimeout, e: self.close_engine(e) + except ContentError, e: + msg = compat.format_exc() + self.connection.error = ContentError(text=msg) except: # XXX: Does socket get leaked if this occurs? msg = compat.format_exc() @@ -1245,7 +1248,11 @@ class Engine: if msg.ttl is not None: dp.ttl = long(msg.ttl*1000) enc, dec = get_codec(msg.content_type) - body = enc(msg.content) + try: + body = enc(msg.content) + except AttributeError, e: + # convert to non-blocking EncodeError + raise EncodeError(e) # XXX: this is not safe for out of order, can this be triggered by pre_ack? def msg_acked(): @@ -1294,7 +1301,10 @@ class Engine: ap = mp.application_headers enc, dec = get_codec(mp.content_type) - content = dec(xfr.payload) + try: + content = dec(xfr.payload) + except Exception, e: + raise DecodeError(e) msg = Message(content) msg.id = mp.message_id if ap is not None: diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 89db45af1b..8576099c0f 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -204,7 +204,13 @@ class Connection(Endpoint): def check_error(self): if self.error: self._condition.gc() - raise self.error + e = self.error + if isinstance(e, ContentError): + """ forget the content error. It will be + raised this time but won't block future calls + """ + self.error = None + raise e def get_error(self): return self.error @@ -887,8 +893,14 @@ class Sender(Endpoint): if self.synced < mno: self.synced = mno self._wakeup() - if not self._ewait(lambda: self.acked >= mno, timeout=timeout): - raise Timeout("sender sync timed out") + try: + if not self._ewait(lambda: self.acked >= mno, timeout=timeout): + raise Timeout("sender sync timed out") + except ContentError: + # clean bad message so we can continue + self.acked = mno + self.session.outgoing.pop(0) + raise @synchronized def close(self, timeout=None): diff --git a/qpid/python/qpid/messaging/exceptions.py b/qpid/python/qpid/messaging/exceptions.py index 0296d615d9..b941565c51 100644 --- a/qpid/python/qpid/messaging/exceptions.py +++ b/qpid/python/qpid/messaging/exceptions.py @@ -154,3 +154,17 @@ class Empty(FetchError): available within the alloted time. """ pass + +## Message Content errors +class ContentError(MessagingError): + """ + This type of exception will be returned to the application + once, and will not block further requests + """ + pass + +class EncodeError(ContentError): + pass + +class DecodeError(ContentError): + pass diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py index 297374b82b..7328165db7 100644 --- a/qpid/python/qpid/tests/messaging/message.py +++ b/qpid/python/qpid/tests/messaging/message.py @@ -153,3 +153,16 @@ class MessageEchoTests(Base): f = echo.content["false"] assert isinstance(t, bool), t assert isinstance(f, bool), f + + def testExceptionRaisedMismatchedContentType(self): + msg = Message(content_type="amqp/map", content="asdf") + try: + self.snd.send(msg) + self.rcv.fetch(0) + assert False, "Exception not raised on mismatched content/content_type" + except Exception, e: + pass + + def testRecoverAfterException(self): + self.testExceptionRaisedMismatchedContentType() + self.testTextPlain() -- cgit v1.2.1