From da8070494a06d0b6c37127eb0a3439e394bddd31 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 30 Mar 2012 08:55:05 +0000 Subject: QPID-3916: Change message store interface to extend DurableConfigurationStore and change VirtualHost contructor Applied patch from Andrew MacBean and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307317 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/BDBMessageStore.java | 33 ++---- .../berkeleydb/entry/PreparedTransaction.java | 12 +-- .../tuple/PreparedTransactionBinding.java | 16 +-- .../store/berkeleydb/BDBMessageStoreTest.java | 7 +- .../server/store/berkeleydb/BDBUpgradeTest.java | 1 + .../apache/qpid/server/AMQBrokerManagerMBean.java | 2 +- .../java/org/apache/qpid/server/AMQChannel.java | 7 +- .../apache/qpid/server/binding/BindingFactory.java | 4 +- .../server/exchange/DefaultExchangeRegistry.java | 2 +- .../org/apache/qpid/server/federation/Bridge.java | 4 +- .../apache/qpid/server/federation/BrokerLink.java | 4 +- .../server/handler/ExchangeDeclareHandler.java | 2 +- .../qpid/server/handler/QueueDeclareHandler.java | 2 +- .../qpid/server/handler/QueueDeleteHandler.java | 2 +- .../apache/qpid/server/queue/AMQQueueFactory.java | 4 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 2 +- .../qpid/server/registry/ApplicationRegistry.java | 2 +- .../server/store/ConfigurationRecoveryHandler.java | 9 -- .../qpid/server/store/DerbyMessageStore.java | 37 ++----- .../server/store/DurableConfigurationStore.java | 6 +- .../qpid/server/store/MemoryMessageStore.java | 16 +-- .../org/apache/qpid/server/store/MessageStore.java | 114 +++------------------ .../org/apache/qpid/server/store/StoreFuture.java | 41 ++++++++ .../qpid/server/store/StoredMemoryMessage.java | 4 +- .../apache/qpid/server/store/StoredMessage.java | 2 +- .../org/apache/qpid/server/store/Transaction.java | 81 +++++++++++++++ .../store/TransactionLogRecoveryHandler.java | 2 +- .../qpid/server/transport/ServerSession.java | 7 +- .../server/transport/ServerSessionDelegate.java | 13 +-- .../server/txn/AsyncAutoCommitTransaction.java | 35 ++++--- .../qpid/server/txn/AutoCommitTransaction.java | 11 +- .../qpid/server/txn/DistributedTransaction.java | 3 +- .../java/org/apache/qpid/server/txn/DtxBranch.java | 9 +- .../apache/qpid/server/txn/LocalTransaction.java | 3 +- .../qpid/server/virtualhost/VirtualHost.java | 2 - .../VirtualHostConfigRecoveryHandler.java | 18 ++-- .../qpid/server/virtualhost/VirtualHostImpl.java | 99 +++++++++--------- .../exchange/AbstractHeadersExchangeTestBase.java | 2 +- .../qpid/server/queue/MockStoredMessage.java | 6 +- .../qpid/server/queue/SimpleAMQQueueTest.java | 14 +-- .../apache/qpid/server/store/MessageStoreTest.java | 8 +- .../qpid/server/store/SkeletonMessageStore.java | 34 ++++-- .../qpid/server/txn/MockStoreTransaction.java | 102 +++++++++++++++--- .../qpid/server/virtualhost/MockVirtualHost.java | 6 -- .../apache/qpid/server/store/SlowMessageStore.java | 18 +--- 45 files changed, 437 insertions(+), 371 deletions(-) create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 589f63f562..402df299fc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -59,8 +59,10 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; @@ -107,7 +109,7 @@ import com.sleepycat.je.TransactionConfig; * dequeue messages to queues. Generate message identifiers. */ @SuppressWarnings({"unchecked"}) -public class BDBMessageStore implements MessageStore, DurableConfigurationStore +public class BDBMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(BDBMessageStore.class); @@ -217,8 +219,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration storeConfiguration, LogSubject logSubject) throws Exception { CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); @@ -231,29 +233,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } recoverMessages(recoveryHandler); - } - public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, LogSubject logSubject) throws Exception - { CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); - - if(!_configured) - { - _logSubject = logSubject; - configure(name,storeConfiguration); - _configured = true; - stateTransition(State.CONFIGURING, State.CONFIGURED); - } - - recoverQueueEntries(recoveryHandler); - - - + recoverQueueEntries(tlogRecoveryHandler); } - public org.apache.qpid.server.store.MessageStore.Transaction newTransaction() + + public org.apache.qpid.server.store.Transaction newTransaction() { return new BDBTransaction(); } @@ -2222,7 +2209,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore BDBMessageStore.this.commit(txn,true); } - return IMMEDIATE_FUTURE; + return StoreFuture.IMMEDIATE_FUTURE; } public void remove() @@ -2238,7 +2225,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private class BDBTransaction implements Transaction + private class BDBTransaction implements org.apache.qpid.server.store.Transaction { private com.sleepycat.je.Transaction _txn; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java index 11ae8b89eb..eb5c4677ff 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java @@ -21,25 +21,25 @@ package org.apache.qpid.server.store.berkeleydb.entry; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; public class PreparedTransaction { - private final MessageStore.Transaction.Record[] _enqueues; - private final MessageStore.Transaction.Record[] _dequeues; + private final Transaction.Record[] _enqueues; + private final Transaction.Record[] _dequeues; - public PreparedTransaction(MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues) + public PreparedTransaction(Transaction.Record[] enqueues, Transaction.Record[] dequeues) { _enqueues = enqueues; _dequeues = dequeues; } - public MessageStore.Transaction.Record[] getEnqueues() + public Transaction.Record[] getEnqueues() { return _enqueues; } - public MessageStore.Transaction.Record[] getDequeues() + public Transaction.Record[] getDequeues() { return _dequeues; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index d85bcd361e..33bf269880 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -25,8 +25,8 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; @@ -35,16 +35,16 @@ public class PreparedTransactionBinding extends TupleBinding StoredMessage addMessage(T metaData); @@ -78,79 +56,13 @@ public interface MessageStore */ boolean isPersistent(); - - - public static interface Transaction - { - /** - * Places a message onto a specified queue, in a given transactional context. - * - * - * - * @param queue The queue to place the message on. - * @param message - * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason. - */ - void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; - - /** - * Extracts a message from a specified queue, in a given transactional context. - * - * @param queue The queue to place the message on. - * @param message The message to dequeue. - * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. - */ - void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; - - - /** - * Commits all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void commitTran() throws AMQStoreException; - - /** - * Commits all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - StoreFuture commitTranAsync() throws AMQStoreException; - - /** - * Abandons all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void abortTran() throws AMQStoreException; - - - public static interface Record - { - TransactionLogResource getQueue(); - EnqueableMessage getMessage(); - } - - void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException; - - void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - throws AMQStoreException; - } - - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception; - Transaction newTransaction(); - - - public static interface StoreFuture - { - boolean isComplete(); - - void waitForCompletion(); - } + /** + * Called to close and cleanup any resources used by the message store. + * + * @throws Exception If the close fails. + */ + void close() throws Exception; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java new file mode 100644 index 0000000000..3e720d9de1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java @@ -0,0 +1,41 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +public interface StoreFuture +{ + StoreFuture IMMEDIATE_FUTURE = new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + + boolean isComplete(); + + void waitForCompletion(); +} \ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index 144cc629bd..e7302270bb 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -122,9 +122,9 @@ public class StoredMemoryMessage implements StoredMessage return buf; } - public MessageStore.StoreFuture flushToStore() + public StoreFuture flushToStore() { - return MessageStore.IMMEDIATE_FUTURE; + return StoreFuture.IMMEDIATE_FUTURE; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java index d4a0381929..7909003855 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -34,7 +34,7 @@ public interface StoredMessage ByteBuffer getContent(int offsetInMessage, int size); - MessageStore.StoreFuture flushToStore(); + StoreFuture flushToStore(); void remove(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java new file mode 100644 index 0000000000..ed6b89e373 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; + +public interface Transaction +{ + /** + * Places a message onto a specified queue, in a given transactional context. + * + * + * + * @param queue The queue to place the message on. + * @param message + * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason. + */ + void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; + + /** + * Extracts a message from a specified queue, in a given transactional context. + * + * @param queue The queue to place the message on. + * @param message The message to dequeue. + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; + + + /** + * Commits all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void commitTran() throws AMQStoreException; + + /** + * Commits all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + StoreFuture commitTranAsync() throws AMQStoreException; + + /** + * Abandons all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void abortTran() throws AMQStoreException; + + + public static interface Record + { + TransactionLogResource getQueue(); + EnqueableMessage getMessage(); + } + + void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException; + + void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues) + throws AMQStoreException; +} \ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java index 48ca72718b..b92d5f3e9b 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java @@ -33,7 +33,7 @@ public interface TransactionLogRecoveryHandler public static interface DtxRecordRecoveryHandler { - void dtxRecord(long format, byte[] globalId, byte[] branchId, MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues); + void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues); void completeDtxRecordRecovery(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 462e880e5f..70239b0fee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -72,6 +72,7 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -975,17 +976,17 @@ public class ServerSession extends Session return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); } - public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } private static class AsyncCommand { - private final MessageStore.StoreFuture _future; + private final StoreFuture _future; private ServerTransaction.Action _action; - public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) { _future = future; _action = action; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index c94a476712..54b91a0ce2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.Subscription_0_10; @@ -126,7 +127,7 @@ public class ServerSessionDelegate extends SessionDelegate serverSession.accept(method.getTransfers()); if(!serverSession.isTransactional()) { - serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, + serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, method)); } } @@ -356,7 +357,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); + serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); } } @@ -768,7 +769,7 @@ public class ServerSessionDelegate extends SessionDelegate { if (exchange.isDurable()) { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); store.createExchange(exchange); } @@ -924,7 +925,7 @@ public class ServerSessionDelegate extends SessionDelegate if (exchange.isDurable() && !exchange.isAutoDelete()) { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); store.removeExchange(exchange); } } @@ -1205,7 +1206,7 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); String queueName = method.getQueue(); AMQQueue queue; @@ -1448,7 +1449,7 @@ public class ServerSessionDelegate extends SessionDelegate queue.delete(); if (queue.isDurable() && !queue.isAutoDelete()) { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); store.removeQueue(queue); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index a062c6732f..d446434d24 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -29,7 +29,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStore.StoreFuture; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; import java.util.Collection; import java.util.List; @@ -71,16 +72,16 @@ public class AsyncAutoCommitTransaction implements ServerTransaction */ public void addPostTransactionAction(final Action immediateAction) { - addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction); + addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction); } public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { - MessageStore.StoreFuture future; + StoreFuture future; if(message.isPersistent() && queue.isDurable()) { if (_logger.isDebugEnabled()) @@ -96,7 +97,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = MessageStore.IMMEDIATE_FUTURE; + future = StoreFuture.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -113,7 +114,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - private void addFuture(final MessageStore.StoreFuture future, final Action action) + private void addFuture(final StoreFuture future, final Action action) { if(action != null) { @@ -130,7 +131,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public void dequeue(Collection queueEntries, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { for(QueueEntry entry : queueEntries) @@ -154,7 +155,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - MessageStore.StoreFuture future; + StoreFuture future; if(txn != null) { future = txn.commitTranAsync(); @@ -162,7 +163,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = MessageStore.IMMEDIATE_FUTURE; + future = StoreFuture.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -182,10 +183,10 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { - MessageStore.StoreFuture future; + StoreFuture future; if(message.isPersistent() && queue.isDurable()) { if (_logger.isDebugEnabled()) @@ -200,7 +201,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = MessageStore.IMMEDIATE_FUTURE; + future = StoreFuture.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -220,7 +221,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public void enqueue(List queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { @@ -246,7 +247,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - MessageStore.StoreFuture future; + StoreFuture future; if (txn != null) { future = txn.commitTranAsync(); @@ -254,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = MessageStore.IMMEDIATE_FUTURE; + future = StoreFuture.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -278,7 +279,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { if(immediatePostTransactionAction != null) { - addFuture(MessageStore.IMMEDIATE_FUTURE, new Action() + addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action() { public void postCommit() { @@ -305,7 +306,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction return false; } - private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) + private void rollbackIfNecessary(Action postTransactionAction, Transaction txn) { if (txn != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index 597797b5f8..e5a7df6880 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; import java.util.Collection; import java.util.List; @@ -67,7 +68,7 @@ public class AutoCommitTransaction implements ServerTransaction public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) @@ -99,7 +100,7 @@ public class AutoCommitTransaction implements ServerTransaction public void dequeue(Collection queueEntries, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { for(QueueEntry entry : queueEntries) @@ -146,7 +147,7 @@ public class AutoCommitTransaction implements ServerTransaction public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) @@ -179,7 +180,7 @@ public class AutoCommitTransaction implements ServerTransaction public void enqueue(List queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { @@ -247,7 +248,7 @@ public class AutoCommitTransaction implements ServerTransaction return false; } - private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) + private void rollbackIfNecessary(Action postTransactionAction, Transaction txn) { if (txn != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index 36f5f7b58f..05d0110e9b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Xid; @@ -38,7 +39,7 @@ public class DistributedTransaction implements ServerTransaction private final AutoCommitTransaction _autoCommitTransaction; - private volatile MessageStore.Transaction _transaction; + private volatile Transaction _transaction; private long _txnStartTime = 0L; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index 99bb639261..3ac71fc6a6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Xid; @@ -48,7 +49,7 @@ public class DtxBranch private final List _enqueueRecords = new ArrayList(); private final List _dequeueRecords = new ArrayList(); - private MessageStore.Transaction _transaction; + private Transaction _transaction; private long _expiration; private VirtualHost _vhost; private ScheduledFuture _timeoutFuture; @@ -199,7 +200,7 @@ public class DtxBranch public void prepare() throws AMQStoreException { - MessageStore.Transaction txn = _store.newTransaction(); + Transaction txn = _store.newTransaction(); txn.recordXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId(), @@ -223,7 +224,7 @@ public class DtxBranch { // prepare has previously been called - MessageStore.Transaction txn = _store.newTransaction(); + Transaction txn = _store.newTransaction(); txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId()); txn.commitTran(); @@ -302,7 +303,7 @@ public class DtxBranch _enqueueRecords.add(new Record(queue, message)); } - private static final class Record implements MessageStore.Transaction.Record + private static final class Record implements Transaction.Record { private final BaseQueue _queue; private final EnqueableMessage _message; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 9b61f7543f..11401ebd65 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; import java.util.ArrayList; import java.util.Collection; @@ -46,7 +47,7 @@ public class LocalTransaction implements ServerTransaction private final List _postTransactionActions = new ArrayList(); - private volatile MessageStore.Transaction _transaction; + private volatile Transaction _transaction; private MessageStore _transactionLog; private long _txnStartTime = 0L; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 5e4a4f2db6..00c8d1ff27 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -57,8 +57,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo MessageStore getMessageStore(); - DurableConfigurationStore getDurableConfigurationStore(); - SecurityManager getSecurityManager(); void close(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 0e965472d5..1da5b8d0c7 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -23,9 +23,7 @@ package org.apache.qpid.server.virtualhost; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; @@ -52,6 +50,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.DtxBranch; @@ -74,11 +73,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class); - private final VirtualHost _virtualHost; private MessageStoreLogSubject _logSubject; - private List _actions; private MessageStore _store; @@ -201,8 +198,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } public void dtxRecord(long format, byte[] globalId, byte[] branchId, - MessageStore.Transaction.Record[] enqueues, - MessageStore.Transaction.Record[] dequeues) + Transaction.Record[] enqueues, + Transaction.Record[] dequeues) { Xid id = new Xid(format, globalId, branchId); DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); @@ -212,7 +209,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa branch = new DtxBranch(id, _store, _virtualHost); dtxRegistry.registerBranch(branch); } - for(MessageStore.Transaction.Record record : enqueues) + for(Transaction.Record record : enqueues) { final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); if(queue != null) @@ -272,7 +269,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } } - for(MessageStore.Transaction.Record record : dequeues) + for(Transaction.Record record : dequeues) { final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); if(queue != null) @@ -385,7 +382,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf) { - _actions = new ArrayList(); try { Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName); @@ -482,7 +478,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa else { _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); - MessageStore.Transaction txn = _store.newTransaction(); + Transaction txn = _store.newTransaction(); txn.dequeueMessage(queue, new DummyMessage(messageId)); txn.commitTranAsync(); } @@ -490,7 +486,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa else { _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); - MessageStore.Transaction txn = _store.newTransaction(); + Transaction txn = _store.newTransaction(); TransactionLogResource mockQueue = new TransactionLogResource() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index fcad1550e1..eccaf553cd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -63,8 +63,12 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; @@ -118,15 +122,13 @@ public class VirtualHostImpl implements VirtualHost private AMQBrokerManagerMBean _brokerMBean; - - private DurableConfigurationStore _durableConfigurationStore; private BindingFactory _bindingFactory; private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception + public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception { if (hostConfig == null) { @@ -166,7 +168,7 @@ public class VirtualHostImpl implements VirtualHost StartupRoutingTable configFileRT = new StartupRoutingTable(); - _durableConfigurationStore = configFileRT; + _messageStore = configFileRT; // This needs to be after the RT has been defined as it creates the default durable exchanges. _exchangeRegistry.initialise(); @@ -175,18 +177,7 @@ public class VirtualHostImpl implements VirtualHost initialiseModel(_configuration); - if (store != null) - { - _messageStore = store; - if(store instanceof DurableConfigurationStore) - { - _durableConfigurationStore = (DurableConfigurationStore) store; - } - } - else - { - initialiseMessageStore(hostConfig); - } + initialiseMessageStore(hostConfig); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); @@ -228,8 +219,8 @@ public class VirtualHostImpl implements VirtualHost /** * Virtual host JMX MBean class. * - * This has some of the methods implemented from management intrerface for exchanges. Any - * implementaion of an Exchange MBean should extend this class. + * This has some of the methods implemented from management interface for exchanges. Any + * Implementation of an Exchange MBean should extend this class. */ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost { @@ -407,27 +398,15 @@ public class VirtualHostImpl implements VirtualHost MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore); - - if(messageStore instanceof DurableConfigurationStore) - { - DurableConfigurationStore durableConfigurationStore = (DurableConfigurationStore) messageStore; - - durableConfigurationStore.configureConfigStore(this.getName(), - recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); - - _durableConfigurationStore = durableConfigurationStore; - } + messageStore.configureConfigStore(this.getName(), + recoveryHandler, + hostConfig.getStoreConfiguration(), + storeLogSubject); messageStore.configureMessageStore(this.getName(), recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); - messageStore.configureTransactionLog(this.getName(), recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); + hostConfig.getStoreConfiguration(), storeLogSubject); _messageStore = messageStore; @@ -472,7 +451,7 @@ public class VirtualHostImpl implements VirtualHost if (newExchange.isDurable()) { - _durableConfigurationStore.createExchange(newExchange); + _messageStore.createExchange(newExchange); } } } @@ -482,10 +461,10 @@ public class VirtualHostImpl implements VirtualHost AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); String queueName = queue.getName(); - if (queue.isDurable()) - { - getDurableConfigurationStore().createQueue(queue); - } + if (queue.isDurable()) + { + getMessageStore().createQueue(queue); + } //get the exchange name (returns default exchange name if none was specified) String exchangeName = queueConfiguration.getExchange(); @@ -573,11 +552,6 @@ public class VirtualHostImpl implements VirtualHost return _messageStore; } - public DurableConfigurationStore getDurableConfigurationStore() - { - return _durableConfigurationStore; - } - public SecurityManager getSecurityManager() { return _securityManager; @@ -800,7 +774,7 @@ public class VirtualHostImpl implements VirtualHost * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded. * This should be removed after the _RT has been fully split from the the TL */ - private static class StartupRoutingTable implements DurableConfigurationStore + private static class StartupRoutingTable implements MessageStore { public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, @@ -856,6 +830,37 @@ public class VirtualHostImpl implements VirtualHost public void deleteBridge(final Bridge bridge) throws AMQStoreException { } + + @Override + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config, LogSubject logSubject) throws Exception + { + } + + @Override + public void close() throws Exception + { + } + + @Override + public StoredMessage addMessage( + T metaData) + { + return null; + } + + @Override + public boolean isPersistent() + { + return false; + } + + @Override + public Transaction newTransaction() + { + return null; + } } @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 7c7645e9e6..488f251b0a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -79,7 +79,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase private BindingFactory bindingFactory = new BindingFactory(new DurableConfigurationStore.Source() { - public DurableConfigurationStore getDurableConfigurationStore() + public DurableConfigurationStore getMessageStore() { return _store; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java index fba3851507..950d59bef5 100755 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import java.nio.ByteBuffer; @@ -105,9 +105,9 @@ public class MockStoredMessage implements StoredMessage return buf; } - public MessageStore.StoreFuture flushToStore() + public StoreFuture flushToStore() { - return MessageStore.IMMEDIATE_FUTURE; + return StoreFuture.IMMEDIATE_FUTURE; } public void remove() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 79c744902d..a8676bf4c2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -61,7 +61,6 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase protected SimpleAMQQueue _queue; protected VirtualHost _virtualHost; - protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore(); protected AMQShortString _qname = new AMQShortString("qname"); protected AMQShortString _owner = new AMQShortString("owner"); protected AMQShortString _routingKey = new AMQShortString("routing key"); @@ -106,7 +105,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(); PropertiesConfiguration env = new PropertiesConfiguration(); - _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), new VirtualHostConfiguration(getClass().getName(), env), _store); + VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(getClass().getName(), env); + vHostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName()); + _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vHostConfig); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments); @@ -634,11 +635,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase qs.add(_queue); MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis()); - StoredMessage handle = _store.addMessage(metaData); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore(); + StoredMessage handle = store.addMessage(metaData); msg.setStoredMessage(handle); - ServerTransaction txn = new AutoCommitTransaction(_store); + ServerTransaction txn = new AutoCommitTransaction(store); txn.enqueue(qs, msg, new ServerTransaction.Action() { @@ -653,7 +655,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase }, 0L); // Check that it is enqueued - AMQQueue data = _store.getMessages().get(1L); + AMQQueue data = store.getMessages().get(1L); assertNull(data); // Dequeue message @@ -664,7 +666,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.dequeue(entry,null); // Check that it is dequeued - data = _store.getMessages().get(1L); + data = store.getMessages().get(1L); assertNull(data); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index d49f0586ba..755d61a260 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -298,7 +298,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase 1, queueRegistry.getQueues().size()); //test that removing the queue means it is not recovered next time - getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName)); + getVirtualHost().getMessageStore().removeQueue(queueRegistry.getQueue(durableQueueName)); reloadVirtualHost(); @@ -351,7 +351,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); //test that removing the exchange means it is not recovered next time - getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); + getVirtualHost().getMessageStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); reloadVirtualHost(); @@ -707,7 +707,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (queue.isDurable() && !queue.isAutoDelete()) { - getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments); + getVirtualHost().getMessageStore().createQueue(queue, queueArguments); } } catch (AMQException e) @@ -751,7 +751,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase getVirtualHost().getExchangeRegistry().registerExchange(exchange); if (durable) { - getVirtualHost().getDurableConfigurationStore().createExchange(exchange); + getVirtualHost().getMessageStore().createExchange(exchange); } } catch (AMQException e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 09d865cb05..38d3fb78fc 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -26,6 +26,8 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; @@ -45,8 +47,8 @@ public class SkeletonMessageStore implements MessageStore public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config, LogSubject logSubject) throws Exception { } @@ -98,14 +100,6 @@ public class SkeletonMessageStore implements MessageStore } - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception - { - - } - public Transaction newTransaction() { return new Transaction() @@ -162,4 +156,24 @@ public class SkeletonMessageStore implements MessageStore } + @Override + public void createBrokerLink(BrokerLink link) throws AMQStoreException + { + } + + @Override + public void deleteBrokerLink(BrokerLink link) throws AMQStoreException + { + } + + @Override + public void createBridge(Bridge bridge) throws AMQStoreException + { + } + + @Override + public void deleteBridge(Bridge bridge) throws AMQStoreException + { + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index 801549e561..e9b7ceacc5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -24,14 +24,21 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStore.StoreFuture; -import org.apache.qpid.server.store.MessageStore.Transaction; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; @@ -126,30 +133,23 @@ class MockStoreTransaction implements Transaction { public void configureMessageStore(final String name, final MessageStoreRecoveryHandler recoveryHandler, - final Configuration config, - final LogSubject logSubject) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler, + final Configuration config, final LogSubject logSubject) throws Exception { - //TODO. } public void close() throws Exception { - //TODO. } public StoredMessage addMessage(final T metaData) { - return null; //TODO. + return null; } public boolean isPersistent() { - return false; //TODO. - } - - public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, LogSubject logSubject) throws Exception - { + return false; } public Transaction newTransaction() @@ -157,6 +157,82 @@ class MockStoreTransaction implements Transaction storeTransaction.setState(TransactionState.STARTED); return storeTransaction; } + + @Override + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + Configuration config, LogSubject logSubject) + throws Exception + { + } + + @Override + public void createExchange(Exchange exchange) + throws AMQStoreException + { + } + + @Override + public void removeExchange(Exchange exchange) + throws AMQStoreException + { + } + + @Override + public void bindQueue(Exchange exchange, AMQShortString routingKey, + AMQQueue queue, FieldTable args) throws AMQStoreException + { + } + + @Override + public void unbindQueue(Exchange exchange, + AMQShortString routingKey, AMQQueue queue, FieldTable args) + throws AMQStoreException + { + } + + @Override + public void createQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void createQueue(AMQQueue queue, FieldTable arguments) + throws AMQStoreException + { + } + + @Override + public void removeQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void updateQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void createBrokerLink(BrokerLink link) + throws AMQStoreException + { + } + + @Override + public void deleteBrokerLink(BrokerLink link) + throws AMQStoreException + { + } + + @Override + public void createBridge(Bridge bridge) throws AMQStoreException + { + } + + @Override + public void deleteBridge(Bridge bridge) throws AMQStoreException + { + } }; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index af742532e2..cccf02c9f3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -38,7 +38,6 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; @@ -111,11 +110,6 @@ public class MockVirtualHost implements VirtualHost return null; } - public DurableConfigurationStore getDurableConfigurationStore() - { - return null; - } - public ExchangeFactory getExchangeFactory() { return null; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 8ffc09930e..c3b006f371 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -38,7 +38,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; -public class SlowMessageStore implements MessageStore, DurableConfigurationStore +public class SlowMessageStore implements MessageStore { private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); private static final String DELAYS = "delays"; @@ -160,11 +160,11 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception + MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config, LogSubject logSubject) throws Exception { - _realStore.configureMessageStore(name, recoveryHandler, config, logSubject); + _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config, logSubject); } public void close() throws Exception @@ -227,14 +227,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("removeQueue"); } - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, LogSubject logSubject) - throws Exception - { - _realStore.configureTransactionLog(name, recoveryHandler, storeConfiguration, logSubject); - } - public Transaction newTransaction() { doPreDelay("beginTran"); -- cgit v1.2.1