diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java')
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java | 26 |
1 files changed, 24 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index d446434d24..efd7850a49 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -44,11 +44,16 @@ import java.util.List; */ public class AsyncAutoCommitTransaction implements ServerTransaction { + static final String QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE = "qpid.strict_order_with_mixed_delivery_mode"; + protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class); private final MessageStore _messageStore; private final FutureRecorder _futureRecorder; + //Set true to ensure strict ordering when enqueing messages with mixed delivery mode, i.e. disable async persistence + private boolean _strictOrderWithMixedDeliveryMode = Boolean.getBoolean(QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE); + public static interface FutureRecorder { public void recordFuture(StoreFuture future, Action action); @@ -129,6 +134,23 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } + private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent) + { + if(action != null) + { + // For persistent messages, do not synchronously invoke postCommit even if the future is completed. + // Otherwise, postCommit (which actually does the enqueuing) might be called on successive messages out of order. + if(future.isComplete() && !persistent && !_strictOrderWithMixedDeliveryMode) + { + action.postCommit(); + } + else + { + _futureRecorder.recordFuture(future, action); + } + } + } + public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { Transaction txn = null; @@ -203,7 +225,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { future = StoreFuture.IMMEDIATE_FUTURE; } - addFuture(future, postTransactionAction); + addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; } catch (AMQException e) @@ -257,7 +279,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { future = StoreFuture.IMMEDIATE_FUTURE; } - addFuture(future, postTransactionAction); + addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; |