diff options
author | Robert Gemmell <robbie@apache.org> | 2011-01-28 11:20:35 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-01-28 11:20:35 +0000 |
commit | eb257d87dd7420516e481cc38983053b0eb9ebf2 (patch) | |
tree | 3c20127f00bfdd3855a408f57f53cfd5dbf6a351 | |
parent | f2b04d7492de18c00bf2ea67b5ed91f4d19d4719 (diff) | |
download | qpid-python-eb257d87dd7420516e481cc38983053b0eb9ebf2.tar.gz |
QPID-3017: improve error handling for the new transaction classes, add some logging, add unit tests
Applied patches from Keith Wall
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1064629 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 1581 insertions, 124 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3e31115dcb..4f86c82578 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -281,7 +281,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel routeCurrentMessage(); - _transaction.addPostCommitAction(new ServerTransaction.Action() + _transaction.addPostTransactionAction(new ServerTransaction.Action() { public void postCommit() @@ -313,7 +313,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel if(!checkMessageUserId(_currentMessage.getContentHeader())) { - _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage)); + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage)); } else { @@ -321,7 +321,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) { - _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index f674741057..db781ead96 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -20,19 +20,28 @@ */ package org.apache.qpid.server.txn; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.BaseQueue; +import java.util.Collection; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.AMQException; - -import java.util.List; -import java.util.Collection; +/** + * An implementation of ServerTransaction where each enqueue/dequeue + * operation takes place within it own transaction. + * + * Since there is no long-lived transaction, the commit and rollback methods of + * this implementation are empty. + */ public class AutoCommitTransaction implements ServerTransaction { + protected static final Logger _logger = Logger.getLogger(AutoCommitTransaction.class); private final TransactionLog _transactionLog; @@ -41,52 +50,69 @@ public class AutoCommitTransaction implements ServerTransaction _transactionLog = transactionLog; } - - public void addPostCommitAction(Action postCommitAction) + /** + * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered + * by the caller are executed immediately. + */ + public void addPostTransactionAction(Action immediateAction) { - postCommitAction.postCommit(); + immediateAction.postCommit(); } - public void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction) + public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - + TransactionLog.Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } - TransactionLog.Transaction txn = _transactionLog.newTransaction(); + txn = _transactionLog.newTransaction(); txn.dequeueMessage(queue, message.getMessageNumber()); - // store.remove enqueue - // store.commit txn.commitTran(); + txn = null; } - postCommitAction.postCommit(); + postTransactionAction.postCommit(); + postTransactionAction = null; } catch (AMQException e) { - //TODO - postCommitAction.onRollback(); - throw new RuntimeException(e); + _logger.error("Error during message dequeue", e); + throw new RuntimeException("Error during message dequeue", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); } + } - public void dequeue(Collection<QueueEntry> ackedMessages, Action postCommitAction) + public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { + TransactionLog.Transaction txn = null; try { - TransactionLog.Transaction txn = null; - for(QueueEntry entry : ackedMessages) + for(QueueEntry entry : queueEntries) { ServerMessage message = entry.getMessage(); - AMQQueue queue = entry.getQueue(); + BaseQueue queue = entry.getQueue(); if(message.isPersistent() && queue.isDurable()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } + if(txn == null) { txn = _transactionLog.newTransaction(); } + txn.dequeueMessage(queue, message.getMessageNumber()); } @@ -94,78 +120,134 @@ public class AutoCommitTransaction implements ServerTransaction if(txn != null) { txn.commitTran(); + txn = null; } - postCommitAction.postCommit(); + postTransactionAction.postCommit(); + postTransactionAction = null; } catch (AMQException e) { - //TODO - postCommitAction.onRollback(); - throw new RuntimeException(e); + _logger.error("Error during message dequeues", e); + throw new RuntimeException("Error during message dequeues", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); } + } - public void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction) + public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + TransactionLog.Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); + } - TransactionLog.Transaction txn = _transactionLog.newTransaction(); + txn = _transactionLog.newTransaction(); txn.enqueueMessage(queue, message.getMessageNumber()); txn.commitTran(); + txn = null; } - postCommitAction.postCommit(); + postTransactionAction.postCommit(); + postTransactionAction = null; } catch (AMQException e) { - //TODO - e.printStackTrace(); - postCommitAction.onRollback(); - throw new RuntimeException(e); + _logger.error("Error during message enqueue", e); + throw new RuntimeException("Error during message enqueue", e); } + finally + { + rollbackIfNecessary(postTransactionAction, txn); + } + } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { + TransactionLog.Transaction txn = null; try { if(message.isPersistent()) { - TransactionLog.Transaction txn = _transactionLog.newTransaction(); Long id = message.getMessageNumber(); - for(BaseQueue q : queues) + for(BaseQueue queue : queues) { - if(q.isDurable()) + if (queue.isDurable()) { - txn.enqueueMessage(q, id); + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); + } + if (txn == null) + { + txn = _transactionLog.newTransaction(); + } + + txn.enqueueMessage(queue, id); } } - txn.commitTran(); + + if (txn != null) + { + txn.commitTran(); + txn = null; + } } - postCommitAction.postCommit(); + postTransactionAction.postCommit(); + postTransactionAction = null; } catch (AMQException e) { - //TODO - postCommitAction.onRollback(); - throw new RuntimeException(e); + _logger.error("Error during message enqueues", e); + throw new RuntimeException("Error during message enqueues", e); + } + finally + { + rollbackIfNecessary(postTransactionAction, txn); } } + public void commit() { - } public void rollback() { + } + private void rollbackIfNecessary(Action postTransactionAction, TransactionLog.Transaction txn) + { + if (txn != null) + { + try + { + txn.abortTran(); + } + catch (AMQStoreException e) + { + _logger.error("Abort transaction failed", e); + // Deliberate decision not to re-throw this exception. Rationale: we can only reach here if a previous + // TransactionLog method has ended in Exception. If we were to re-throw here, we would prevent + // our caller from receiving the original exception (which is likely to be more revealing of the underlying error). + } + } + if (postTransactionAction != null) + { + postTransactionAction.onRollback(); + } } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 7c9276dbdc..a04c743be1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -21,21 +21,29 @@ package org.apache.qpid.server.txn; */ -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.BaseQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.AMQException; - -import java.util.List; -import java.util.ArrayList; -import java.util.Collection; +/** + * A concrete implementation of ServerTransaction where enqueue/dequeue + * operations share a single long-lived transaction. + * + * The caller is responsible for invoking commit() (or rollback()) as necessary. + */ public class LocalTransaction implements ServerTransaction { - private final List<Action> _postCommitActions = new ArrayList<Action>(); + protected static final Logger _logger = Logger.getLogger(LocalTransaction.class); + + private final List<Action> _postTransactionActions = new ArrayList<Action>(); private volatile TransactionLog.Transaction _transaction; private TransactionLog _transactionLog; @@ -45,17 +53,23 @@ public class LocalTransaction implements ServerTransaction _transactionLog = transactionLog; } - public void addPostCommitAction(Action postCommitAction) + public void addPostTransactionAction(Action postTransactionAction) { - _postCommitActions.add(postCommitAction); + _postTransactionActions.add(postTransactionAction); } - public void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction) + public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + _postTransactionActions.add(postTransactionAction); + if(message.isPersistent() && queue.isDurable()) { try { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } beginTranIfNecessary(); _transaction.dequeueMessage(queue, message.getMessageNumber()); @@ -63,23 +77,31 @@ public class LocalTransaction implements ServerTransaction } catch(AMQException e) { + _logger.error("Error during message dequeues", e); tidyUpOnError(e); } } - _postCommitActions.add(postCommitAction); } - public void dequeue(Collection<QueueEntry> queueEntries, Action postCommitAction) + public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { + _postTransactionActions.add(postTransactionAction); + try { for(QueueEntry entry : queueEntries) { ServerMessage message = entry.getMessage(); - AMQQueue queue = entry.getQueue(); + BaseQueue queue = entry.getQueue(); + if(message.isPersistent() && queue.isDurable()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); + } + beginTranIfNecessary(); _transaction.dequeueMessage(queue, message.getMessageNumber()); } @@ -88,9 +110,9 @@ public class LocalTransaction implements ServerTransaction } catch(AMQException e) { + _logger.error("Error during message dequeues", e); tidyUpOnError(e); } - _postCommitActions.add(postCommitAction); } @@ -98,7 +120,7 @@ public class LocalTransaction implements ServerTransaction { try { - for(Action action : _postCommitActions) + for(Action action : _postTransactionActions) { action.onRollback(); } @@ -107,14 +129,20 @@ public class LocalTransaction implements ServerTransaction { try { - _transaction.abortTran(); + if (_transaction != null) + { + _transaction.abortTran(); + } } - catch (Exception e1) + catch (Exception abortException) { - // TODO could try to chain the information to the original error + _logger.error("Abort transaction failed while trying to handle previous error", abortException); + } + finally + { + _transaction = null; + _postTransactionActions.clear(); } - _transaction = null; - _postCommitActions.clear(); } throw new RuntimeException(e); @@ -122,6 +150,7 @@ public class LocalTransaction implements ServerTransaction private void beginTranIfNecessary() { + if(_transaction == null) { try @@ -135,52 +164,50 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction) + public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + _postTransactionActions.add(postTransactionAction); + if(message.isPersistent() && queue.isDurable()) { - beginTranIfNecessary(); try { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); + } + + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message.getMessageNumber()); } catch (Exception e) { + _logger.error("Error during message enqueue", e); + tidyUpOnError(e); } } - _postCommitActions.add(postCommitAction); - - } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) { - + _postTransactionActions.add(postTransactionAction); if(message.isPersistent()) { - if(_transaction == null) - { - for(BaseQueue queue : queues) - { - if(queue.isDurable()) - { - beginTranIfNecessary(); - break; - } - } - - - } - - try { for(BaseQueue queue : queues) { if(queue.isDurable()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() ); + } + + + beginTranIfNecessary(); _transaction.enqueueMessage(queue, message.getMessageNumber()); } } @@ -188,12 +215,11 @@ public class LocalTransaction implements ServerTransaction } catch (Exception e) { + _logger.error("Error during message enqueue", e); + tidyUpOnError(e); } } - _postCommitActions.add(postCommitAction); - - } public void commit() @@ -202,55 +228,52 @@ public class LocalTransaction implements ServerTransaction { if(_transaction != null) { - _transaction.commitTran(); } - for(Action action : _postCommitActions) + for(Action action : _postTransactionActions) { action.postCommit(); } } catch (Exception e) { - for(Action action : _postCommitActions) + _logger.error("Failed to commit transaction", e); + + for(Action action : _postTransactionActions) { action.onRollback(); } - //TODO - throw new RuntimeException(e); + throw new RuntimeException("Failed to commit transaction", e); } finally { _transaction = null; - _postCommitActions.clear(); + _postTransactionActions.clear(); } } public void rollback() { - try { if(_transaction != null) { - _transaction.abortTran(); } } catch (AMQException e) { - //TODO - e.printStackTrace(); - throw new RuntimeException(e); + _logger.error("Failed to rollback transaction", e); + throw new RuntimeException("Failed to rollback transaction", e); } finally { try { - for(Action action : _postCommitActions) + for(Action action : _postTransactionActions) { action.onRollback(); } @@ -258,7 +281,7 @@ public class LocalTransaction implements ServerTransaction finally { _transaction = null; - _postCommitActions.clear(); + _postTransactionActions.clear(); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index f3ef6569f3..b61b8a5c64 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -27,14 +27,24 @@ import org.apache.qpid.server.queue.QueueEntry; import java.util.Collection; import java.util.List; + +/** + * The ServerTransaction interface allows a set enqueue/dequeue operations to be + * performed against the transaction belonging the underlying TransactionLog object. + * + * Typically all ServerTransaction implementations decide if a message should be enlisted + * into a store transaction by examining the durable property of the queue, and the persistence + * property of the message. + * + * A caller may register a list of post transaction Actions to be + * performed on commit() (or rollback()). + * + */ public interface ServerTransaction { - - void addPostCommitAction(Action postCommitAction); - - - - + /** + * Represents an action to be performed on transaction commit or rollback + */ public static interface Action { public void postCommit(); @@ -42,16 +52,52 @@ public interface ServerTransaction public void onRollback(); } - void dequeue(BaseQueue queue, EnqueableMessage message, Action postCommitAction); - - void dequeue(Collection<QueueEntry> ackedMessages, Action postCommitAction); - - void enqueue(BaseQueue queue, EnqueableMessage message, Action postCommitAction); - - void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postCommitAction); - - + /** + * Register an Action for execution after transaction commit or rollback. Actions + * will be executed in the order in which they are registered. + */ + void addPostTransactionAction(Action postTransactionAction); + + /** + * Dequeue a message from a queue registering a post transaction action. + * + * A store operation will result only for a persistent message on a durable queue. + */ + void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction); + + /** + * Dequeue a message(s) from queue(s) registering a post transaction action. + * + * Store operations will result only for a persistent messages on durable queues. + */ + void dequeue(Collection<QueueEntry> messages, Action postTransactionAction); + + /** + * Enqueue a message to a queue registering a post transaction action. + * + * A store operation will result only for a persistent message on a durable queue. + */ + void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction); + + /** + * Enqueue a message(s) to queue(s) registering a post transaction action. + * + * Store operations will result only for a persistent messages on durable queues. + */ + void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction); + + /** + * Commit the transaction represented by this object. + * + * If the caller has registered one or more Actions, the postCommit() method on each will + * be executed immediately after the underlying transaction has committed. + */ void commit(); + /** Rollback the transaction represented by this object. + * + * If the caller has registered one or more Actions, the onRollback() method on each will + * be executed immediately after the underlying transaction has rolled-back. + */ void rollback(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 8b894c9629..5bdbe2c68e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; public class MockQueueEntry implements QueueEntry { @@ -100,7 +101,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public AMQMessage getMessage() + public ServerMessage getMessage() { return _message; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java new file mode 100644 index 0000000000..9afed49922 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -0,0 +1,442 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.queue.MockQueueEntry; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; +import org.apache.qpid.test.utils.QpidTestCase; + +/** + * A unit test ensuring that AutoCommitTransaction creates a separate transaction for + * each dequeue/enqueue operation that involves enlistable messages. Verifies + * that the transaction is properly committed (or rolled-back in the case of exception), + * and that post transaction actions are correctly fired. + * + */ +public class AutoCommitTransactionTest extends QpidTestCase +{ + private ServerTransaction _transaction = null; // Class under test + + private TransactionLog _transactionLog; + private AMQQueue _queue; + private List<AMQQueue> _queues; + private Collection<QueueEntry> _queueEntries; + private ServerMessage _message; + private MockAction _action; + private MockStoreTransaction _storeTransaction; + + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _storeTransaction = createTestStoreTransaction(false); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _action = new MockAction(); + + _transaction = new AutoCommitTransaction(_transactionLog); + } + + /** + * Tests the enqueue of a non persistent message to a single non durable queue. + * Asserts that a store transaction has not been started and commit action fired. + */ + public void testEnqueueToNonDurableQueueOfNonPersistentMessage() throws Exception + { + _message = createTestMessage(false); + _queue = createTestAMQQueue(false); + + _transaction.enqueue(_queue, _message, _action); + + assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + + } + + /** + * Tests the enqueue of a persistent message to a durable queue. + * Asserts that a store transaction has been committed and commit action fired. + */ + public void testEnqueueToDurableQueueOfPersistentMessage() throws Exception + { + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _transaction.enqueue(_queue, _message, _action); + + assertEquals("Enqueue of persistent message to durable queue must cause message to be enqueued", 1, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the case where the store operation throws an exception. + * Asserts that the transaction is aborted and rollback action is fired. + */ + public void testStoreEnqueueCausesException() throws Exception + { + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _storeTransaction = createTestStoreTransaction(true); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _transaction = new AutoCommitTransaction(_transactionLog); + + try + { + _transaction.enqueue(_queue, _message, _action); + fail("Exception not thrown"); + } + catch (RuntimeException re) + { + // PASS + } + + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + assertTrue("Rollback action must be fired", _action.isRollbackActionFired()); + assertFalse("Post commit action must be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the enqueue of a non persistent message to a many non durable queues. + * Asserts that a store transaction has not been started and post commit action fired. + */ + public void testEnqueueToManyNonDurableQueuesOfNonPersistentMessage() throws Exception + { + _message = createTestMessage(false); + _queues = createTestBaseQueues(new boolean[] {false, false, false}); + + _transaction.enqueue(_queues, _message, _action); + + assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + + } + + + /** + * Tests the enqueue of a persistent message to a many non durable queues. + * Asserts that a store transaction has not been started and post commit action + * fired. + */ + public void testEnqueueToManyNonDurableQueuesOfPersistentMessage() throws Exception + { + _message = createTestMessage(true); + _queues = createTestBaseQueues(new boolean[] {false, false, false}); + + _transaction.enqueue(_queues, _message, _action); + + assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + + } + + /** + * Tests the enqueue of a persistent message to many queues, some durable others not. + * Asserts that a store transaction has been committed and post commit action fired. + */ + public void testEnqueueToDurableAndNonDurableQueuesOfPersistentMessage() throws Exception + { + _message = createTestMessage(true); + _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); + + _transaction.enqueue(_queues, _message, _action); + + assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the case where the store operation throws an exception. + * Asserts that the transaction is aborted and rollback action fired. + */ + public void testStoreEnqueuesCausesExceptions() throws Exception + { + _message = createTestMessage(true); + _queues = createTestBaseQueues(new boolean[] {true, true}); + + _storeTransaction = createTestStoreTransaction(true); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _transaction = new AutoCommitTransaction(_transactionLog); + + try + { + _transaction.enqueue(_queues, _message, _action); + fail("Exception not thrown"); + } + catch (RuntimeException re) + { + // PASS + } + + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + assertTrue("Rollback action must be fired", _action.isRollbackActionFired()); + assertFalse("Post commit action must not be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the dequeue of a non persistent message from a single non durable queue. + * Asserts that a store transaction has not been started and post commit action + * fired. + */ + public void testDequeueFromNonDurableQueueOfNonPersistentMessage() throws Exception + { + _message = createTestMessage(false); + _queue = createTestAMQQueue(false); + + _transaction.dequeue(_queue, _message, _action); + + assertEquals("Dequeue of non-persistent message must not cause message to be dequeued", 0, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + + } + + /** + * Tests the dequeue of a persistent message from a single non durable queue. + * Asserts that a store transaction has not been started and post commit + * action fired. + */ + public void testDequeueFromDurableQueueOfPersistentMessage() throws Exception + { + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _transaction.dequeue(_queue, _message, _action); + + assertEquals("Dequeue of persistent message to durable queue must cause message to be dequeued",1, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the case where the store operation throws an exception. + * Asserts that the transaction is aborted and post rollback action + * fired. + */ + public void testStoreDequeueCausesException() throws Exception + { + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _storeTransaction = createTestStoreTransaction(true); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _transaction = new AutoCommitTransaction(_transactionLog); + + try + { + _transaction.dequeue(_queue, _message, _action); + fail("Exception not thrown"); + } + catch (RuntimeException re) + { + // PASS + } + + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + + assertTrue("Rollback action must be fired", _action.isRollbackActionFired()); + assertFalse("Post commit action must not be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the dequeue of a non persistent message from many non durable queues. + * Asserts that a store transaction has not been started and post commit action + * fired. + */ + public void testDequeueFromManyNonDurableQueuesOfNonPersistentMessage() throws Exception + { + _queueEntries = createTestQueueEntries(new boolean[] {false, false, false}, new boolean[] {false, false, false}); + + _transaction.dequeue(_queueEntries, _action); + + assertEquals("Dequeue of non-persistent messages must not cause message to be dequeued", 0, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertEquals("Rollback action must not be fired", false, _action.isRollbackActionFired()); + assertEquals("Post commit action must be fired", true, _action.isPostCommitActionFired()); + + } + + + /** + * Tests the dequeue of a persistent message from a many non durable queues. + * Asserts that a store transaction has not been started and post commit action + * fired. + */ + public void testDequeueFromManyNonDurableQueuesOfPersistentMessage() throws Exception + { + _queueEntries = createTestQueueEntries(new boolean[] {false, false, false}, new boolean[] {true, true, true}); + + _transaction.dequeue(_queueEntries, _action); + + assertEquals("Dequeue of persistent message from non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the dequeue of a persistent message from many queues, some durable others not. + * Asserts that a store transaction has not been started and post commit action fired. + */ + public void testDequeueFromDurableAndNonDurableQueuesOfPersistentMessage() throws Exception + { + // A transaction will exist owing to the 1st and 3rd. + _queueEntries = createTestQueueEntries(new boolean[] {true, false, true, true}, new boolean[] {true, true, true, false}); + + _transaction.dequeue(_queueEntries, _action); + + assertEquals("Dequeue of persistent messages from durable/non-durable queues must cause messages to be dequeued", 2, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the case where the store operation throws an exception. + * Asserts that the transaction is aborted and post rollback action fired. + */ + public void testStoreDequeuesCauseExceptions() throws Exception + { + // Transactions will exist owing to the 1st and 3rd queue entries in the collection + _queueEntries = createTestQueueEntries(new boolean[] {true}, new boolean[] {true}); + + _storeTransaction = createTestStoreTransaction(true); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _transaction = new AutoCommitTransaction(_transactionLog); + + try + { + _transaction.dequeue(_queueEntries, _action); + fail("Exception not thrown"); + } + catch (RuntimeException re) + { + // PASS + } + + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + + assertTrue("Rollback action must be fired", _action.isRollbackActionFired()); + assertFalse("Post commit action must not be fired", _action.isPostCommitActionFired()); + } + + /** + * Tests the add of a post-commit action. Since AutoCommitTranctions + * have no long lived transactions, the post commit action is fired immediately. + */ + public void testPostCommitActionFiredImmediately() throws Exception + { + + _transaction.addPostTransactionAction(_action); + + assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); + assertFalse("Rollback action must be fired", _action.isRollbackActionFired()); + } + + private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags) + { + Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + + assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length); + + for(int i = 0; i < queueDurableFlags.length; i++) + { + final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]); + final ServerMessage message = createTestMessage(messagePersistentFlags[i]); + + queueEntries.add(new MockQueueEntry() + { + + @Override + public ServerMessage getMessage() + { + return message; + } + + @Override + public AMQQueue getQueue() + { + return queue; + } + + }); + } + + return queueEntries; + } + + private MockStoreTransaction createTestStoreTransaction(boolean throwException) + { + return new MockStoreTransaction(throwException); + } + + private List<AMQQueue> createTestBaseQueues(boolean[] durableFlags) + { + List<AMQQueue> queues = new ArrayList<AMQQueue>(); + for (boolean b: durableFlags) + { + queues.add(createTestAMQQueue(b)); + } + + return queues; + } + + private AMQQueue createTestAMQQueue(final boolean durable) + { + return new MockAMQQueue("mockQueue") + { + @Override + public boolean isDurable() + { + return durable; + } + + }; + } + + private ServerMessage createTestMessage(final boolean persistent) + { + return new MockServerMessage(persistent); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java new file mode 100644 index 0000000000..e81fd8e3f1 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -0,0 +1,557 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.queue.MockQueueEntry; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; +import org.apache.qpid.test.utils.QpidTestCase; + +/** + * A unit test ensuring that LocalTransactionTest creates a long-lived store transaction + * that spans many dequeue/enqueue operations of enlistable messages. Verifies + * that the long-lived transaction is properly committed and rolled back, and that + * post transaction actions are correctly fired. + * + */ +public class LocalTransactionTest extends QpidTestCase +{ + private ServerTransaction _transaction = null; // Class under test + + private AMQQueue _queue; + private List<AMQQueue> _queues; + private Collection<QueueEntry> _queueEntries; + private ServerMessage _message; + private MockAction _action1; + private MockAction _action2; + private MockStoreTransaction _storeTransaction; + private TransactionLog _transactionLog; + + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _storeTransaction = createTestStoreTransaction(false); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _action1 = new MockAction(); + _action2 = new MockAction(); + + _transaction = new LocalTransaction(_transactionLog); + + } + + + /** + * Tests the enqueue of a non persistent message to a single non durable queue. + * Asserts that a store transaction has not been started. + */ + public void testEnqueueToNonDurableQueueOfNonPersistentMessage() throws Exception + { + _message = createTestMessage(false); + _queue = createTestAMQQueue(false); + + _transaction.enqueue(_queue, _message, _action1); + + assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + } + + /** + * Tests the enqueue of a persistent message to a durable queue. + * Asserts that a store transaction has been started. + */ + public void testEnqueueToDurableQueueOfPersistentMessage() throws Exception + { + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _transaction.enqueue(_queue, _message, _action1); + + assertEquals("Enqueue of persistent message to durable queue must cause message to be enqueued", 1, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + } + + /** + * Tests the case where the store operation throws an exception. + * Asserts that the transaction is aborted. + */ + public void testStoreEnqueueCausesException() throws Exception + { + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _storeTransaction = createTestStoreTransaction(true); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _transaction = new LocalTransaction(_transactionLog); + + try + { + _transaction.enqueue(_queue, _message, _action1); + fail("Exception not thrown"); + } + catch (RuntimeException re) + { + // PASS + } + + assertTrue("Rollback action must be fired", _action1.isRollbackActionFired()); + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + + assertFalse("Post commit action must not be fired", _action1.isPostCommitActionFired()); + + } + + /** + * Tests the enqueue of a non persistent message to a many non durable queues. + * Asserts that a store transaction has not been started. + */ + public void testEnqueueToManyNonDurableQueuesOfNonPersistentMessage() throws Exception + { + _message = createTestMessage(false); + _queues = createTestBaseQueues(new boolean[] {false, false, false}); + + _transaction.enqueue(_queues, _message, _action1); + + assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + } + + /** + * Tests the enqueue of a persistent message to a many non durable queues. + * Asserts that a store transaction has not been started. + */ + public void testEnqueueToManyNonDurableQueuesOfPersistentMessage() throws Exception + { + _message = createTestMessage(true); + _queues = createTestBaseQueues(new boolean[] {false, false, false}); + + _transaction.enqueue(_queues, _message, _action1); + + assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + + } + + /** + * Tests the enqueue of a persistent message to many queues, some durable others not. + * Asserts that a store transaction has been started. + */ + public void testEnqueueToDurableAndNonDurableQueuesOfPersistentMessage() throws Exception + { + _message = createTestMessage(true); + _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); + + _transaction.enqueue(_queues, _message, _action1); + + assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + + } + + /** + * Tests the case where the store operation throws an exception. + * Asserts that the transaction is aborted. + */ + public void testStoreEnqueuesCausesExceptions() throws Exception + { + _message = createTestMessage(true); + _queues = createTestBaseQueues(new boolean[] {true, true}); + + _storeTransaction = createTestStoreTransaction(true); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _transaction = new LocalTransaction(_transactionLog); + + try + { + _transaction.enqueue(_queues, _message, _action1); + fail("Exception not thrown"); + } + catch (RuntimeException re) + { + // PASS + } + + assertTrue("Rollback action must be fired", _action1.isRollbackActionFired()); + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + assertFalse("Post commit action must not be fired", _action1.isPostCommitActionFired()); + } + + /** + * Tests the dequeue of a non persistent message from a single non durable queue. + * Asserts that a store transaction has not been started. + */ + public void testDequeueFromNonDurableQueueOfNonPersistentMessage() throws Exception + { + _message = createTestMessage(false); + _queue = createTestAMQQueue(false); + + _transaction.dequeue(_queue, _message, _action1); + + assertEquals("Dequeue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + + } + + /** + * Tests the dequeue of a persistent message from a single non durable queue. + * Asserts that a store transaction has not been started. + */ + public void testDequeueFromDurableQueueOfPersistentMessage() throws Exception + { + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _transaction.dequeue(_queue, _message, _action1); + + assertEquals("Dequeue of non-persistent message must cause message to be dequeued", 1, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + } + + /** + * Tests the case where the store operation throws an exception. + * Asserts that the transaction is aborted. + */ + public void testStoreDequeueCausesException() throws Exception + { + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _storeTransaction = createTestStoreTransaction(true); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _transaction = new LocalTransaction(_transactionLog); + + try + { + _transaction.dequeue(_queue, _message, _action1); + fail("Exception not thrown"); + } + catch (RuntimeException re) + { + // PASS + } + + assertTrue("Rollback action must be fired", _action1.isRollbackActionFired()); + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + assertFalse("Post commit action must not be fired", _action1.isPostCommitActionFired()); + + } + + /** + * Tests the dequeue of a non persistent message from many non durable queues. + * Asserts that a store transaction has not been started. + */ + public void testDequeueFromManyNonDurableQueuesOfNonPersistentMessage() throws Exception + { + _queueEntries = createTestQueueEntries(new boolean[] {false, false, false}, new boolean[] {false, false, false}); + + _transaction.dequeue(_queueEntries, _action1); + + assertEquals("Dequeue of non-persistent messages must not cause message to be dequeued", 0, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + + } + + /** + * Tests the dequeue of a persistent message from a many non durable queues. + * Asserts that a store transaction has not been started. + */ + public void testDequeueFromManyNonDurableQueuesOfPersistentMessage() throws Exception + { + _queueEntries = createTestQueueEntries(new boolean[] {false, false, false}, new boolean[] {true, true, true}); + + _transaction.dequeue(_queueEntries, _action1); + + assertEquals("Dequeue of persistent message from non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + } + + /** + * Tests the dequeue of a persistent message from many queues, some durable others not. + * Asserts that a store transaction has not been started. + */ + public void testDequeueFromDurableAndNonDurableQueuesOfPersistentMessage() throws Exception + { + // A transaction will exist owing to the 1st and 3rd. + _queueEntries = createTestQueueEntries(new boolean[] {true, false, true, true}, new boolean[] {true, true, true, false}); + + _transaction.dequeue(_queueEntries, _action1); + + assertEquals("Dequeue of persistent messages from durable/non-durable queues must cause messages to be dequeued", 2, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); + assertNotFired(_action1); + } + + /** + * Tests the case where the store operation throws an exception. + * Asserts that the transaction is aborted. + */ + public void testStoreDequeuesCauseExceptions() throws Exception + { + // Transactions will exist owing to the 1st and 3rd queue entries in the collection + _queueEntries = createTestQueueEntries(new boolean[] {true}, new boolean[] {true}); + + _storeTransaction = createTestStoreTransaction(true); + _transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction); + _transaction = new LocalTransaction(_transactionLog); + + try + { + _transaction.dequeue(_queueEntries, _action1); + fail("Exception not thrown"); + } + catch (RuntimeException re) + { + // PASS + } + + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + assertTrue("Rollback action must be fired", _action1.isRollbackActionFired()); + assertFalse("Post commit action must not be fired", _action1.isPostCommitActionFired()); + } + + /** + * Tests the add of a post-commit action. Unlike AutoCommitTranctions, the post transaction actions + * is added to a list to be fired on commit or rollback. + */ + public void testAddingPostCommitActionNotFiredImmediately() throws Exception + { + + _transaction.addPostTransactionAction(_action1); + + assertNotFired(_action1); + } + + + /** + * Tests committing a transaction without work accepted without error and without causing store + * enqueues or dequeues. + */ + public void testCommitNoWork() throws Exception + { + + _transaction.commit(); + + assertEquals("Unexpected number of store dequeues", 0, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected number of store enqueues", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + } + + /** + * Tests rolling back a transaction without work accepted without error and without causing store + * enqueues or dequeues. + */ + public void testRollbackNoWork() throws Exception + { + + _transaction.rollback(); + + assertEquals("Unexpected number of store dequeues", 0, _storeTransaction.getNumberOfDequeuedMessages()); + assertEquals("Unexpected number of store enqueues", 0, _storeTransaction.getNumberOfEnqueuedMessages()); + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + } + + /** + * Tests the dequeuing of a message with a commit. Test ensures that the underlying store transaction is + * correctly controlled and the post commit action is fired. + */ + public void testCommitWork() throws Exception + { + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired()); + + _transaction.dequeue(_queue, _message, _action1); + assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); + assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired()); + + _transaction.commit(); + + assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); + assertTrue("Post commit action must be fired", _action1.isPostCommitActionFired()); + } + + /** + * Tests the dequeuing of a message with a rollback. Test ensures that the underlying store transaction is + * correctly controlled and the post rollback action is fired. + */ + public void testRollbackWork() throws Exception + { + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + + assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired yet", _action1.isRollbackActionFired()); + + _transaction.dequeue(_queue, _message, _action1); + + assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); + assertFalse("Rollback action must not be fired yet", _action1.isRollbackActionFired()); + + _transaction.rollback(); + + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + assertTrue("Rollback action must be fired", _action1.isRollbackActionFired()); + + } + + /** + * Variation of testCommitWork with an additional post transaction action. + * + */ + public void testCommitWorkWithAdditionalPostAction() throws Exception + { + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _transaction.addPostTransactionAction(_action1); + _transaction.dequeue(_queue, _message, _action2); + _transaction.commit(); + + assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); + + assertTrue("Post commit action1 must be fired", _action1.isPostCommitActionFired()); + assertTrue("Post commit action2 must be fired", _action2.isPostCommitActionFired()); + + assertFalse("Rollback action1 must not be fired", _action1.isRollbackActionFired()); + assertFalse("Rollback action2 must not be fired", _action1.isRollbackActionFired()); + } + + /** + * Variation of testRollbackWork with an additional post transaction action. + * + */ + public void testRollbackWorkWithAdditionalPostAction() throws Exception + { + + _message = createTestMessage(true); + _queue = createTestAMQQueue(true); + + _transaction.addPostTransactionAction(_action1); + _transaction.dequeue(_queue, _message, _action2); + _transaction.rollback(); + + assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); + + assertFalse("Post commit action1 must not be fired", _action1.isPostCommitActionFired()); + assertFalse("Post commit action2 must not be fired", _action2.isPostCommitActionFired()); + + assertTrue("Rollback action1 must be fired", _action1.isRollbackActionFired()); + assertTrue("Rollback action2 must be fired", _action1.isRollbackActionFired()); + } + + private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags) + { + Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + + assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length); + + for(int i = 0; i < queueDurableFlags.length; i++) + { + final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]); + final ServerMessage message = createTestMessage(messagePersistentFlags[i]); + + queueEntries.add(new MockQueueEntry() + { + + @Override + public ServerMessage getMessage() + { + return message; + } + + @Override + public AMQQueue getQueue() + { + return queue; + } + + }); + } + + return queueEntries; + } + + private MockStoreTransaction createTestStoreTransaction(boolean throwException) + { + return new MockStoreTransaction(throwException); + } + + private List<AMQQueue> createTestBaseQueues(boolean[] durableFlags) + { + List<AMQQueue> queues = new ArrayList<AMQQueue>(); + for (boolean b: durableFlags) + { + queues.add(createTestAMQQueue(b)); + } + + return queues; + } + + private AMQQueue createTestAMQQueue(final boolean durable) + { + return new MockAMQQueue("mockQueue") + { + @Override + public boolean isDurable() + { + return durable; + } + + }; + } + + private ServerMessage createTestMessage(final boolean persistent) + { + return new MockServerMessage(persistent); + } + + private void assertNotFired(MockAction action) + { + assertFalse("Rollback action must not be fired", action.isRollbackActionFired()); + assertFalse("Post commit action must not be fired", action.isPostCommitActionFired()); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockAction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockAction.java new file mode 100644 index 0000000000..975e3e91b9 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockAction.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.qpid.server.txn.ServerTransaction.Action; + +/** + * Mock implementation of a ServerTranaction Action + * allowing its state to be observed. + * + */ +class MockAction implements Action +{ + private boolean _rollbackFired = false; + private boolean _postCommitFired = false; + + @Override + public void postCommit() + { + _postCommitFired = true; + } + + @Override + public void onRollback() + { + _rollbackFired = true; + } + + public boolean isRollbackActionFired() + { + return _rollbackFired; + } + + public boolean isPostCommitActionFired() + { + return _postCommitFired; + } +}
\ No newline at end of file diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java new file mode 100644 index 0000000000..64c62fd029 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import java.nio.ByteBuffer; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.qpid.server.configuration.SessionConfig; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; + +/** + * Mock Server Message allowing its persistent flag to be controlled from test. + */ +class MockServerMessage implements ServerMessage +{ + /** + * + */ + private final boolean persistent; + + /** + * @param persistent + */ + MockServerMessage(boolean persistent) + { + this.persistent = persistent; + } + + @Override + public boolean isPersistent() + { + return persistent; + } + + @Override + public MessageReference newReference() + { + throw new NotImplementedException(); + } + + @Override + public boolean isImmediate() + { + throw new NotImplementedException(); + } + + @Override + public long getSize() + { + throw new NotImplementedException(); + } + + @Override + public SessionConfig getSessionConfig() + { + throw new NotImplementedException(); + } + + @Override + public String getRoutingKey() + { + throw new NotImplementedException(); + } + + @Override + public AMQMessageHeader getMessageHeader() + { + throw new NotImplementedException(); + } + + @Override + public long getExpiration() + { + throw new NotImplementedException(); + } + + @Override + public int getContent(ByteBuffer buf, int offset) + { + throw new NotImplementedException(); + } + + @Override + public long getArrivalTime() + { + throw new NotImplementedException(); + } + + @Override + public Long getMessageNumber() + { + return 0L; + } +}
\ No newline at end of file diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java new file mode 100644 index 0000000000..5700bba9f8 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.NotImplementedException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.TransactionLog.StoreFuture; +import org.apache.qpid.server.store.TransactionLog.Transaction; + +/** + * Mock implementation of a (Store) Transaction allow its state to be observed. + * Also provide a factory method to produce TestTransactionLog objects suitable + * for unit test use. + * + */ +class MockStoreTransaction implements Transaction +{ + enum TransactionState {NOT_STARTED, STARTED, COMMITTED, ABORTED}; + + private TransactionState _state = TransactionState.NOT_STARTED; + + private int _numberOfEnqueuedMessages = 0; + private int _numberOfDequeuedMessages = 0; + private boolean _throwExceptionOnQueueOp; + + public MockStoreTransaction(boolean throwExceptionOnQueueOp) + { + _throwExceptionOnQueueOp = throwExceptionOnQueueOp; + } + + public void setState(TransactionState state) + { + _state = state; + } + + public TransactionState getState() + { + return _state; + } + + @Override + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + { + if (_throwExceptionOnQueueOp) + { + + throw new AMQStoreException("Mocked exception"); + } + + _numberOfEnqueuedMessages++; + } + + public int getNumberOfDequeuedMessages() + { + return _numberOfDequeuedMessages; + } + + public int getNumberOfEnqueuedMessages() + { + return _numberOfEnqueuedMessages; + } + + + @Override + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException + { + if (_throwExceptionOnQueueOp) + { + throw new AMQStoreException("Mocked exception"); + } + + _numberOfDequeuedMessages++; + } + + @Override + public void commitTran() throws AMQStoreException + { + _state = TransactionState.COMMITTED; + } + + @Override + public StoreFuture commitTranAsync() throws AMQStoreException + { + throw new NotImplementedException(); + } + + @Override + public void abortTran() throws AMQStoreException + { + _state = TransactionState.ABORTED; + } + + public static TransactionLog createTestTransactionLog(final MockStoreTransaction storeTransaction) + { + return new TransactionLog() + { + + @Override + public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, LogSubject logSubject) throws Exception + { + } + + @Override + public Transaction newTransaction() + { + storeTransaction.setState(TransactionState.STARTED); + return storeTransaction; + } + + }; + } +}
\ No newline at end of file |