summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-28 21:47:44 +0000
committerAlan Conway <aconway@apache.org>2014-08-28 21:47:44 +0000
commitb93b20dd123757b208f9e78ef778e3648c3438a0 (patch)
tree57c643010a5fc63792b0031eb75682ad728dae31 /python
parent2c9370641727e5d365d0cf52e9a0ba2d5faa087b (diff)
downloadqpid-python-b93b20dd123757b208f9e78ef778e3648c3438a0.tar.gz
QPID-5975: HA extra/missing messages when running qpid-txtest2 in a loop with failover.
This is partly not-a-bug, there is a client error handling issue that has been corrected. qpid-txtest2 initializes a queue with messages at the start and drains the queues at the end. These operations are *not transactional*. Therefore duplicates are expected if there is a failover during initialization or draining. When duplicates were observed, there was indeed a failover at one of these times. Making these operations transactional is not enough to pass, now we see the test fail with "no messages to fetch". This is explained as follows: If there is a failover during a transaction, TransactionAborted is raised. The client assumes the transaction was rolled back and re-plays it. However, if the failover occurs at a critical point *after* the client has sent commit but *before* it has received a response, then the the client *does not know* whether the transaction was committed or rolled-back on the new primary. Re-playing in this case can duplicate the transaction. Each transaction moves messages from one queue to another so as long as transactions are atomic the total number of messages will not change. However, if transactions are duplicated, a transactional session may try to move more messages than exist on the queue, hence "no messages to fetch". For example if thread 1 moves N messages from q1 to q2, and thread 2 tries to move N+M messages back, then thread 2 will fail. This problem has been corrected as follows: C++ and python clients now raise the following exceptions: - TransactionAborted: The transaction has definitely been rolled back due to a connection failure before commit or a broker error (e.g. a store error) during commit. It can safely be replayed. - TransactionUnknown: The transaction outcome is unknown because the connection failed at the critical time. There's no simple automatic way to know what happened without examining the state of the broker queues. Unfortunately With this fix qpid-txtest2 is no longer useful test for TX failover because it regularly raises TransactionUnknown and there's not much we can do with that. A better test of TX atomicity with failover is to run a pair of qpid-send/qpid-receive with fail-over and verify that the number of enqueues/dequeues and message depth are a multiple of the transaction size. See the JIRA for such a test. (Note these test also sometimes raise TransactionUnknown but it doesn't matter since all we are checking is that messages go on and off the queues in multiple of the TX size.) ) Note: the original bug also reported seeing missing messages from qpid-txtest2. I don't have a good explanation for that but since the qpid-send/receive test shows that transactions are atomic I am going to let that go for now. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1621211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging/driver.py5
-rw-r--r--python/qpid/messaging/endpoints.py10
-rw-r--r--python/qpid/messaging/exceptions.py11
3 files changed, 22 insertions, 4 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 2ade144687..4a3f0bc198 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -680,7 +680,10 @@ class Engine:
# We could re-do transactional enqueues, but not dequeues.
for ssn in self.connection.sessions.values():
if ssn.transactional:
- ssn.error = TransactionAborted("Transaction aborted due to transport failure")
+ if ssn.committing:
+ ssn.error = TransactionUnknown(text="Transaction outcome unknown due to transport failure")
+ else:
+ ssn.error = TransactionAborted(text="Transaction aborted due to transport failure")
ssn.closed = True
if e:
self.connection.error = e
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 8d0356d093..7d353e1cb4 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -732,8 +732,14 @@ class Session(Endpoint):
raise NontransactionalSession()
self.committing = True
self._wakeup()
- if not self._ecwait(lambda: not self.committing, timeout=timeout):
- raise Timeout("commit timed out")
+ try:
+ if not self._ecwait(lambda: not self.committing, timeout=timeout):
+ raise Timeout("commit timed out")
+ except TransactionError:
+ raise
+ except Exception, e:
+ self.error = TransactionAborted(text="Transaction aborted: %s"%e)
+ raise self.error
if self.aborted:
raise TransactionAborted()
assert self.committed
diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py
index b941565c51..2284d7cde9 100644
--- a/python/qpid/messaging/exceptions.py
+++ b/python/qpid/messaging/exceptions.py
@@ -35,7 +35,6 @@ class MessagingError(Exception):
if info:
msg += " " + ", ".join(["%s=%r" % (k, v) for k, v in self.info.items()])
Exception.__init__(self, msg)
-
class InternalError(MessagingError):
pass
@@ -86,9 +85,19 @@ class NontransactionalSession(SessionError):
pass
class TransactionError(SessionError):
+ """Base class for transactional errors"""
pass
class TransactionAborted(TransactionError):
+ """The transaction was automatically rolled back. This could be due to an error
+ on the broker, such as a store failure, or a connection failure during the
+ transaction"""
+ pass
+
+class TransactionUnknown(TransactionError):
+ """ The outcome of the transaction on the broker (commit or roll-back) is not
+ known. This occurs when the connection fails after we sent the commit but
+ before we received a response."""
pass
class UnauthorizedAccess(SessionError):