From 1dae8306e77a0914bee371f582852806d2b3b3b1 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 2 Aug 2012 09:40:27 +0000 Subject: QPID-4171: Fix enqueue ordering for persistent messsages Applied patch from Philip Harvey and Oleksandr Rudyy merged from trunk r1367990 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1368401 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 19 --- .../server/txn/AsyncAutoCommitTransaction.java | 26 +++- .../server/txn/AsyncAutoCommitTransactionTest.java | 140 +++++++++++++++++++++ .../qpid/server/util/InternalBrokerBaseCase.java | 2 +- 4 files changed, 165 insertions(+), 22 deletions(-) create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 6479911ea9..4648809c09 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -138,8 +138,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private final LinkedList _unfinishedCommandsQueue = new LinkedList(); - private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; - private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); // Set of messages being acknowledged in the current transaction @@ -1620,23 +1618,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } - public void completeAsyncCommands() - { - AsyncCommand cmd; - while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) - { - cmd.complete(); - _unfinishedCommandsQueue.poll(); - } - while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) - { - cmd = _unfinishedCommandsQueue.poll(); - cmd.awaitReadyForCompletion(); - cmd.complete(); - } - } - - public void sync() { AsyncCommand cmd; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index d446434d24..efd7850a49 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -44,11 +44,16 @@ import java.util.List; */ public class AsyncAutoCommitTransaction implements ServerTransaction { + static final String QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE = "qpid.strict_order_with_mixed_delivery_mode"; + protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class); private final MessageStore _messageStore; private final FutureRecorder _futureRecorder; + //Set true to ensure strict ordering when enqueing messages with mixed delivery mode, i.e. disable async persistence + private boolean _strictOrderWithMixedDeliveryMode = Boolean.getBoolean(QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE); + public static interface FutureRecorder { public void recordFuture(StoreFuture future, Action action); @@ -129,6 +134,23 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } + private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent) + { + if(action != null) + { + // For persistent messages, do not synchronously invoke postCommit even if the future is completed. + // Otherwise, postCommit (which actually does the enqueuing) might be called on successive messages out of order. + if(future.isComplete() && !persistent && !_strictOrderWithMixedDeliveryMode) + { + action.postCommit(); + } + else + { + _futureRecorder.recordFuture(future, action); + } + } + } + public void dequeue(Collection queueEntries, Action postTransactionAction) { Transaction txn = null; @@ -203,7 +225,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { future = StoreFuture.IMMEDIATE_FUTURE; } - addFuture(future, postTransactionAction); + addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; } catch (AMQException e) @@ -257,7 +279,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { future = StoreFuture.IMMEDIATE_FUTURE; } - addFuture(future, postTransactionAction); + addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java new file mode 100644 index 0000000000..1aa91fa98a --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; + +import java.util.Collections; + +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction.FutureRecorder; +import org.apache.qpid.server.txn.ServerTransaction.Action; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AsyncAutoCommitTransactionTest extends QpidTestCase +{ + private static final String STRICT_ORDER_SYSTEM_PROPERTY = AsyncAutoCommitTransaction.QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE; + + private FutureRecorder _futureRecorder = mock(FutureRecorder.class); + private EnqueableMessage _message = mock(EnqueableMessage.class); + private BaseQueue _queue = mock(BaseQueue.class); + private MessageStore _messageStore = mock(MessageStore.class); + private Transaction _storeTransaction = mock(Transaction.class); + private Action _postTransactionAction = mock(Action.class); + private StoreFuture _future = mock(StoreFuture.class); + + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + when(_messageStore.newTransaction()).thenReturn(_storeTransaction); + when(_storeTransaction.commitTranAsync()).thenReturn(_future); + when(_queue.isDurable()).thenReturn(true); + } + + public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false"); + + when(_message.isPersistent()).thenReturn(true); + when(_future.isComplete()).thenReturn(true); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); + + verify(_storeTransaction).enqueueMessage(_queue, _message); + verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verifyZeroInteractions(_postTransactionAction); + } + + public void testEnqueuePersistentMessageOnMultiplQueuesPostCommitNotCalled() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false"); + + when(_message.isPersistent()).thenReturn(true); + when(_future.isComplete()).thenReturn(true); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction, System.currentTimeMillis()); + + verify(_storeTransaction).enqueueMessage(_queue, _message); + verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verifyZeroInteractions(_postTransactionAction); + } + + public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureNotYetComplete() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false"); + + when(_message.isPersistent()).thenReturn(true); + when(_future.isComplete()).thenReturn(false); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); + + verify(_storeTransaction).enqueueMessage(_queue, _message); + verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verifyZeroInteractions(_postTransactionAction); + } + + public void testEnqueueTransientMessagePostCommitIsCalledWhenNotBehavingStrictly() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false"); + + when(_message.isPersistent()).thenReturn(false); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); + + verifyZeroInteractions(_storeTransaction); + verify(_postTransactionAction).postCommit(); + verifyZeroInteractions(_futureRecorder); + } + + public void testEnqueueTransientMessagePostCommitIsCalledWhenBehavingStrictly() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "true"); + + when(_message.isPersistent()).thenReturn(false); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); + + verifyZeroInteractions(_storeTransaction); + verify(_futureRecorder).recordFuture(StoreFuture.IMMEDIATE_FUTURE, _postTransactionAction); + verifyZeroInteractions(_postTransactionAction); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 5a54df144f..d7a9078412 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -254,7 +254,7 @@ public class InternalBrokerBaseCase extends QpidTestCase channel.publishContentHeader(_headerBody); } - + channel.sync(); } public void acknowledge(AMQChannel channel, long deliveryTag) -- cgit v1.2.1