summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java26
1 files changed, 24 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index d446434d24..efd7850a49 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -44,11 +44,16 @@ import java.util.List;
*/
public class AsyncAutoCommitTransaction implements ServerTransaction
{
+ static final String QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE = "qpid.strict_order_with_mixed_delivery_mode";
+
protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class);
private final MessageStore _messageStore;
private final FutureRecorder _futureRecorder;
+ //Set true to ensure strict ordering when enqueing messages with mixed delivery mode, i.e. disable async persistence
+ private boolean _strictOrderWithMixedDeliveryMode = Boolean.getBoolean(QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE);
+
public static interface FutureRecorder
{
public void recordFuture(StoreFuture future, Action action);
@@ -129,6 +134,23 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
+ private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+ {
+ if(action != null)
+ {
+ // For persistent messages, do not synchronously invoke postCommit even if the future is completed.
+ // Otherwise, postCommit (which actually does the enqueuing) might be called on successive messages out of order.
+ if(future.isComplete() && !persistent && !_strictOrderWithMixedDeliveryMode)
+ {
+ action.postCommit();
+ }
+ else
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+ }
+ }
+
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
@@ -203,7 +225,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
future = StoreFuture.IMMEDIATE_FUTURE;
}
- addFuture(future, postTransactionAction);
+ addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
}
catch (AMQException e)
@@ -257,7 +279,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
future = StoreFuture.IMMEDIATE_FUTURE;
}
- addFuture(future, postTransactionAction);
+ addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;