summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-08-02 09:40:27 +0000
committerRobert Gemmell <robbie@apache.org>2012-08-02 09:40:27 +0000
commit1dae8306e77a0914bee371f582852806d2b3b3b1 (patch)
tree5c911e5c5606a9f2bba4422861d4d8ffeb47312f
parentdadd0aada761e7168417dd0c6179237969f8192e (diff)
downloadqpid-python-1dae8306e77a0914bee371f582852806d2b3b3b1.tar.gz
QPID-4171: Fix enqueue ordering for persistent messsages
Applied patch from Philip Harvey <phil@philharveyonline.com> and Oleksandr Rudyy <orudyy@gmail.com> merged from trunk r1367990 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1368401 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java19
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java26
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java140
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java2
4 files changed, 165 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 6479911ea9..4648809c09 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -138,8 +138,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
- private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
-
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
// Set of messages being acknowledged in the current transaction
@@ -1620,23 +1618,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
- public void completeAsyncCommands()
- {
- AsyncCommand cmd;
- while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
- {
- cmd.complete();
- _unfinishedCommandsQueue.poll();
- }
- while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
- {
- cmd = _unfinishedCommandsQueue.poll();
- cmd.awaitReadyForCompletion();
- cmd.complete();
- }
- }
-
-
public void sync()
{
AsyncCommand cmd;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index d446434d24..efd7850a49 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -44,11 +44,16 @@ import java.util.List;
*/
public class AsyncAutoCommitTransaction implements ServerTransaction
{
+ static final String QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE = "qpid.strict_order_with_mixed_delivery_mode";
+
protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class);
private final MessageStore _messageStore;
private final FutureRecorder _futureRecorder;
+ //Set true to ensure strict ordering when enqueing messages with mixed delivery mode, i.e. disable async persistence
+ private boolean _strictOrderWithMixedDeliveryMode = Boolean.getBoolean(QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE);
+
public static interface FutureRecorder
{
public void recordFuture(StoreFuture future, Action action);
@@ -129,6 +134,23 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
+ private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+ {
+ if(action != null)
+ {
+ // For persistent messages, do not synchronously invoke postCommit even if the future is completed.
+ // Otherwise, postCommit (which actually does the enqueuing) might be called on successive messages out of order.
+ if(future.isComplete() && !persistent && !_strictOrderWithMixedDeliveryMode)
+ {
+ action.postCommit();
+ }
+ else
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+ }
+ }
+
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
@@ -203,7 +225,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
future = StoreFuture.IMMEDIATE_FUTURE;
}
- addFuture(future, postTransactionAction);
+ addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
}
catch (AMQException e)
@@ -257,7 +279,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
future = StoreFuture.IMMEDIATE_FUTURE;
}
- addFuture(future, postTransactionAction);
+ addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
new file mode 100644
index 0000000000..1aa91fa98a
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
+
+import java.util.Collections;
+
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction.FutureRecorder;
+import org.apache.qpid.server.txn.ServerTransaction.Action;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AsyncAutoCommitTransactionTest extends QpidTestCase
+{
+ private static final String STRICT_ORDER_SYSTEM_PROPERTY = AsyncAutoCommitTransaction.QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE;
+
+ private FutureRecorder _futureRecorder = mock(FutureRecorder.class);
+ private EnqueableMessage _message = mock(EnqueableMessage.class);
+ private BaseQueue _queue = mock(BaseQueue.class);
+ private MessageStore _messageStore = mock(MessageStore.class);
+ private Transaction _storeTransaction = mock(Transaction.class);
+ private Action _postTransactionAction = mock(Action.class);
+ private StoreFuture _future = mock(StoreFuture.class);
+
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ when(_messageStore.newTransaction()).thenReturn(_storeTransaction);
+ when(_storeTransaction.commitTranAsync()).thenReturn(_future);
+ when(_queue.isDurable()).thenReturn(true);
+ }
+
+ public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception
+ {
+ setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false");
+
+ when(_message.isPersistent()).thenReturn(true);
+ when(_future.isComplete()).thenReturn(true);
+
+ AsyncAutoCommitTransaction asyncAutoCommitTransaction =
+ new AsyncAutoCommitTransaction(_messageStore, _futureRecorder);
+
+ asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
+
+ verify(_storeTransaction).enqueueMessage(_queue, _message);
+ verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+ verifyZeroInteractions(_postTransactionAction);
+ }
+
+ public void testEnqueuePersistentMessageOnMultiplQueuesPostCommitNotCalled() throws Exception
+ {
+ setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false");
+
+ when(_message.isPersistent()).thenReturn(true);
+ when(_future.isComplete()).thenReturn(true);
+
+ AsyncAutoCommitTransaction asyncAutoCommitTransaction =
+ new AsyncAutoCommitTransaction(_messageStore, _futureRecorder);
+
+ asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction, System.currentTimeMillis());
+
+ verify(_storeTransaction).enqueueMessage(_queue, _message);
+ verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+ verifyZeroInteractions(_postTransactionAction);
+ }
+
+ public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureNotYetComplete() throws Exception
+ {
+ setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false");
+
+ when(_message.isPersistent()).thenReturn(true);
+ when(_future.isComplete()).thenReturn(false);
+
+ AsyncAutoCommitTransaction asyncAutoCommitTransaction =
+ new AsyncAutoCommitTransaction(_messageStore, _futureRecorder);
+
+ asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
+
+ verify(_storeTransaction).enqueueMessage(_queue, _message);
+ verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+ verifyZeroInteractions(_postTransactionAction);
+ }
+
+ public void testEnqueueTransientMessagePostCommitIsCalledWhenNotBehavingStrictly() throws Exception
+ {
+ setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false");
+
+ when(_message.isPersistent()).thenReturn(false);
+
+ AsyncAutoCommitTransaction asyncAutoCommitTransaction =
+ new AsyncAutoCommitTransaction(_messageStore, _futureRecorder);
+
+ asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
+
+ verifyZeroInteractions(_storeTransaction);
+ verify(_postTransactionAction).postCommit();
+ verifyZeroInteractions(_futureRecorder);
+ }
+
+ public void testEnqueueTransientMessagePostCommitIsCalledWhenBehavingStrictly() throws Exception
+ {
+ setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "true");
+
+ when(_message.isPersistent()).thenReturn(false);
+
+ AsyncAutoCommitTransaction asyncAutoCommitTransaction =
+ new AsyncAutoCommitTransaction(_messageStore, _futureRecorder);
+
+ asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
+
+ verifyZeroInteractions(_storeTransaction);
+ verify(_futureRecorder).recordFuture(StoreFuture.IMMEDIATE_FUTURE, _postTransactionAction);
+ verifyZeroInteractions(_postTransactionAction);
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 5a54df144f..d7a9078412 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -254,7 +254,7 @@ public class InternalBrokerBaseCase extends QpidTestCase
channel.publishContentHeader(_headerBody);
}
-
+ channel.sync();
}
public void acknowledge(AMQChannel channel, long deliveryTag)