summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Ross <jross@apache.org>2013-04-23 21:40:49 +0000
committerJustin Ross <jross@apache.org>2013-04-23 21:40:49 +0000
commit036d9119982e67f5c1d784862b7001819d5f5a7f (patch)
tree0eb353d5a3834cd484b29b9966436c37138b6b91
parent0d3b8ef91982b687ced6cfa24ff95e28143774d7 (diff)
downloadqpid-python-036d9119982e67f5c1d784862b7001819d5f5a7f.tar.gz
QPID-2453: Allow the session to keep operating after content codec errors; a patch from Ernie Allen
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1471158 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/messaging/driver.py14
-rw-r--r--qpid/python/qpid/messaging/endpoints.py18
-rw-r--r--qpid/python/qpid/messaging/exceptions.py14
-rw-r--r--qpid/python/qpid/tests/messaging/message.py13
4 files changed, 54 insertions, 5 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index 2bd638f327..7a36745c33 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -497,6 +497,9 @@ class Driver:
self.engine.dispatch()
except HeartbeatTimeout, e:
self.close_engine(e)
+ except ContentError, e:
+ msg = compat.format_exc()
+ self.connection.error = ContentError(text=msg)
except:
# XXX: Does socket get leaked if this occurs?
msg = compat.format_exc()
@@ -1245,7 +1248,11 @@ class Engine:
if msg.ttl is not None:
dp.ttl = long(msg.ttl*1000)
enc, dec = get_codec(msg.content_type)
- body = enc(msg.content)
+ try:
+ body = enc(msg.content)
+ except AttributeError, e:
+ # convert to non-blocking EncodeError
+ raise EncodeError(e)
# XXX: this is not safe for out of order, can this be triggered by pre_ack?
def msg_acked():
@@ -1294,7 +1301,10 @@ class Engine:
ap = mp.application_headers
enc, dec = get_codec(mp.content_type)
- content = dec(xfr.payload)
+ try:
+ content = dec(xfr.payload)
+ except Exception, e:
+ raise DecodeError(e)
msg = Message(content)
msg.id = mp.message_id
if ap is not None:
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index 89db45af1b..8576099c0f 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -204,7 +204,13 @@ class Connection(Endpoint):
def check_error(self):
if self.error:
self._condition.gc()
- raise self.error
+ e = self.error
+ if isinstance(e, ContentError):
+ """ forget the content error. It will be
+ raised this time but won't block future calls
+ """
+ self.error = None
+ raise e
def get_error(self):
return self.error
@@ -887,8 +893,14 @@ class Sender(Endpoint):
if self.synced < mno:
self.synced = mno
self._wakeup()
- if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
- raise Timeout("sender sync timed out")
+ try:
+ if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
+ raise Timeout("sender sync timed out")
+ except ContentError:
+ # clean bad message so we can continue
+ self.acked = mno
+ self.session.outgoing.pop(0)
+ raise
@synchronized
def close(self, timeout=None):
diff --git a/qpid/python/qpid/messaging/exceptions.py b/qpid/python/qpid/messaging/exceptions.py
index 0296d615d9..b941565c51 100644
--- a/qpid/python/qpid/messaging/exceptions.py
+++ b/qpid/python/qpid/messaging/exceptions.py
@@ -154,3 +154,17 @@ class Empty(FetchError):
available within the alloted time.
"""
pass
+
+## Message Content errors
+class ContentError(MessagingError):
+ """
+ This type of exception will be returned to the application
+ once, and will not block further requests
+ """
+ pass
+
+class EncodeError(ContentError):
+ pass
+
+class DecodeError(ContentError):
+ pass
diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py
index 297374b82b..7328165db7 100644
--- a/qpid/python/qpid/tests/messaging/message.py
+++ b/qpid/python/qpid/tests/messaging/message.py
@@ -153,3 +153,16 @@ class MessageEchoTests(Base):
f = echo.content["false"]
assert isinstance(t, bool), t
assert isinstance(f, bool), f
+
+ def testExceptionRaisedMismatchedContentType(self):
+ msg = Message(content_type="amqp/map", content="asdf")
+ try:
+ self.snd.send(msg)
+ self.rcv.fetch(0)
+ assert False, "Exception not raised on mismatched content/content_type"
+ except Exception, e:
+ pass
+
+ def testRecoverAfterException(self):
+ self.testExceptionRaisedMismatchedContentType()
+ self.testTextPlain()