diff options
author | Keith Wall <kwall@apache.org> | 2012-03-30 08:55:05 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-03-30 08:55:05 +0000 |
commit | da8070494a06d0b6c37127eb0a3439e394bddd31 (patch) | |
tree | 85ca23ccf051f3157a5f4b3be1b7752dfe576c6d | |
parent | 6e605c53e91d5b9b1bf46985c5cf8bd94a34de4d (diff) | |
download | qpid-python-da8070494a06d0b6c37127eb0a3439e394bddd31.tar.gz |
QPID-3916: Change message store interface to extend DurableConfigurationStore and change VirtualHost contructor
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307317 13f79535-47bb-0310-9956-ffa450edef68
45 files changed, 437 insertions, 371 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 589f63f562..402df299fc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -59,8 +59,10 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMemoryMessage; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; @@ -107,7 +109,7 @@ import com.sleepycat.je.TransactionConfig; * dequeue messages to queues. <tr><td> Generate message identifiers. </table> */ @SuppressWarnings({"unchecked"}) -public class BDBMessageStore implements MessageStore, DurableConfigurationStore +public class BDBMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(BDBMessageStore.class); @@ -217,8 +219,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration storeConfiguration, LogSubject logSubject) throws Exception { CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName())); @@ -231,29 +233,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } recoverMessages(recoveryHandler); - } - public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, LogSubject logSubject) throws Exception - { CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); - - if(!_configured) - { - _logSubject = logSubject; - configure(name,storeConfiguration); - _configured = true; - stateTransition(State.CONFIGURING, State.CONFIGURED); - } - - recoverQueueEntries(recoveryHandler); - - - + recoverQueueEntries(tlogRecoveryHandler); } - public org.apache.qpid.server.store.MessageStore.Transaction newTransaction() + + public org.apache.qpid.server.store.Transaction newTransaction() { return new BDBTransaction(); } @@ -2222,7 +2209,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore BDBMessageStore.this.commit(txn,true); } - return IMMEDIATE_FUTURE; + return StoreFuture.IMMEDIATE_FUTURE; } public void remove() @@ -2238,7 +2225,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - private class BDBTransaction implements Transaction + private class BDBTransaction implements org.apache.qpid.server.store.Transaction { private com.sleepycat.je.Transaction _txn; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java index 11ae8b89eb..eb5c4677ff 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java @@ -21,25 +21,25 @@ package org.apache.qpid.server.store.berkeleydb.entry; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; public class PreparedTransaction { - private final MessageStore.Transaction.Record[] _enqueues; - private final MessageStore.Transaction.Record[] _dequeues; + private final Transaction.Record[] _enqueues; + private final Transaction.Record[] _dequeues; - public PreparedTransaction(MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues) + public PreparedTransaction(Transaction.Record[] enqueues, Transaction.Record[] dequeues) { _enqueues = enqueues; _dequeues = dequeues; } - public MessageStore.Transaction.Record[] getEnqueues() + public Transaction.Record[] getEnqueues() { return _enqueues; } - public MessageStore.Transaction.Record[] getDequeues() + public Transaction.Record[] getDequeues() { return _dequeues; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index d85bcd361e..33bf269880 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -25,8 +25,8 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; @@ -35,16 +35,16 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction @Override public PreparedTransaction entryToObject(TupleInput input) { - MessageStore.Transaction.Record[] enqueues = readRecords(input); + Transaction.Record[] enqueues = readRecords(input); - MessageStore.Transaction.Record[] dequeues = readRecords(input); + Transaction.Record[] dequeues = readRecords(input); return new PreparedTransaction(enqueues, dequeues); } - private MessageStore.Transaction.Record[] readRecords(TupleInput input) + private Transaction.Record[] readRecords(TupleInput input) { - MessageStore.Transaction.Record[] records = new MessageStore.Transaction.Record[input.readInt()]; + Transaction.Record[] records = new Transaction.Record[input.readInt()]; for(int i = 0; i < records.length; i++) { records[i] = new RecordImpl(input.readString(), input.readLong()); @@ -60,7 +60,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } - private void writeRecords(MessageStore.Transaction.Record[] records, TupleOutput output) + private void writeRecords(Transaction.Record[] records, TupleOutput output) { if(records == null) { @@ -69,7 +69,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction else { output.writeInt(records.length); - for(MessageStore.Transaction.Record record : records) + for(Transaction.Record record : records) { output.writeString(record.getQueue().getResourceName()); output.writeLong(record.getMessage().getMessageNumber()); @@ -77,7 +77,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } } - private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage + private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage { private final String _queueName; diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 74fba168a9..0ccfbe5a2a 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -419,7 +420,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto } }; - MessageStore.Transaction txn = log.newTransaction(); + Transaction txn = log.newTransaction(); txn.enqueueMessage(mockQueue, new MockMessage(1L)); txn.enqueueMessage(mockQueue, new MockMessage(5L)); @@ -457,7 +458,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto } }; - MessageStore.Transaction txn = log.newTransaction(); + Transaction txn = log.newTransaction(); txn.enqueueMessage(mockQueue, new MockMessage(21L)); txn.abortTran(); @@ -498,7 +499,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto } }; - MessageStore.Transaction txn = log.newTransaction(); + Transaction txn = log.newTransaction(); txn.enqueueMessage(mockQueue, new MockMessage(30L)); txn.commitTran(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java index b906ab2474..4e201d5473 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store.berkeleydb; + import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 265aa7714e..b388def86c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -76,7 +76,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr _queueRegistry = virtualHost.getQueueRegistry(); _exchangeRegistry = virtualHost.getExchangeRegistry(); _defaultExchange = _exchangeRegistry.getDefaultExchange(); - _durableConfig = virtualHost.getDurableConfigurationStore(); + _durableConfig = virtualHost.getMessageStore(); _exchangeFactory = virtualHost.getExchangeFactory(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 22f9544b0c..6910247577 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -83,6 +83,7 @@ import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; @@ -1556,7 +1557,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } - public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } @@ -1590,10 +1591,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private static class AsyncCommand { - private final MessageStore.StoreFuture _future; + private final StoreFuture _future; private ServerTransaction.Action _action; - public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) { _future = future; _action = action; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java index fe66a6d341..250c417ef1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -192,7 +192,7 @@ public class BindingFactory if (b.isDurable() && !restore) { - _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); + _configSource.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); } queue.addQueueDeleteTask(b); @@ -265,7 +265,7 @@ public class BindingFactory if (b.isDurable()) { - _configSource.getDurableConfigurationStore().unbindQueue(exchange, + _configSource.getMessageStore().unbindQueue(exchange, new AMQShortString(bindingKey), queue, FieldTable.convertToFieldTable(arguments)); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index a5fa9f014e..ebe0645bc4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -63,7 +63,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public DurableConfigurationStore getDurableConfigurationStore() { - return _host.getDurableConfigurationStore(); + return _host.getMessageStore(); } public void registerExchange(Exchange exchange) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java index b58802e1ff..4b4bdd4efb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java @@ -132,7 +132,7 @@ public class Bridge implements BridgeConfig { try { - brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this); + brokerLink.getVirtualHost().getMessageStore().createBridge(this); } catch (AMQStoreException e) { @@ -220,7 +220,7 @@ public class Bridge implements BridgeConfig { try { - brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this); + brokerLink.getVirtualHost().getMessageStore().createBridge(this); } catch (AMQStoreException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index 032df8bb0d..4bcc0d6136 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -250,7 +250,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener { try { - _virtualHost.getDurableConfigurationStore().createBrokerLink(this); + _virtualHost.getMessageStore().createBrokerLink(this); } catch (AMQStoreException e) { @@ -295,7 +295,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener { try { - _virtualHost.getDurableConfigurationStore().createBrokerLink(this); + _virtualHost.getMessageStore().createBrokerLink(this); } catch (AMQStoreException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 6d55f31ebc..8756409f64 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -106,7 +106,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange if (exchange.isDurable()) { - virtualHost.getDurableConfigurationStore().createExchange(exchange); + virtualHost.getMessageStore().createExchange(exchange); } } catch(AMQUnknownExchangeType e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index f57f7eb9e6..7d993ae14d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -65,7 +65,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar VirtualHost virtualHost = protocolConnection.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); final AMQShortString queueName; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index cc37259d54..762f090b83 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -62,7 +62,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); AMQChannel channel = protocolConnection.getChannel(channelId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index aca5891d2e..8ff3f0148b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -292,7 +292,7 @@ public class AMQQueueFactory exchangeRegistry.registerExchange(dlExchange); //enter the dle in the persistent store - virtualHost.getDurableConfigurationStore().createExchange(dlExchange); + virtualHost.getMessageStore().createExchange(dlExchange); } } @@ -312,7 +312,7 @@ public class AMQQueueFactory dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args); //enter the dlq in the persistent store - virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); + virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index cc1041d9de..1cd7e3505f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -346,7 +346,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if(isDurable()) { - getVirtualHost().getDurableConfigurationStore().updateQueue(this); + getVirtualHost().getMessageStore().updateQueue(this); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 9951f7d3c8..4ed28b965d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -664,7 +664,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception { - VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig, null); + VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig); _virtualHostRegistry.registerVirtualHost(virtualHost); getBroker().addVirtualHost(virtualHost); return virtualHost; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index fb67500da9..73f127e097 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -60,13 +60,4 @@ public interface ConfigurationRecoveryHandler void completeBridgeRecoveryForLink(); } - public static interface QueueEntryRecoveryHandler - { - void complete(); - - void queueEntry(String queueName, long messageId); - } - - - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 4d63136a9d..86304a0984 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -69,7 +69,7 @@ import java.util.concurrent.atomic.AtomicLong; * * TODO extract the SQL statements into a generic JDBC store */ -public class DerbyMessageStore implements MessageStore, DurableConfigurationStore +public class DerbyMessageStore implements MessageStore { private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); @@ -277,8 +277,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration storeConfiguration, LogSubject logSubject) throws Exception { if(!_configured) { @@ -297,38 +297,13 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor recoverMessages(recoveryHandler); - } - - - - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception - { - - if(!_configured) - { - _logSubject = logSubject; - } CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName())); - if(!_configured) - { - - _logSubject = logSubject; - - commonConfiguration(name, storeConfiguration, logSubject); - _configured = true; - } - - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(recoveryHandler); + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(tlogRecoveryHandler); recoverXids(dtxrh); } - - private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject) throws ClassNotFoundException, SQLException { @@ -2167,7 +2142,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } } - private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage + private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage { private final String _queueName; @@ -2650,7 +2625,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } throw new RuntimeException(e); } - return IMMEDIATE_FUTURE; + return StoreFuture.IMMEDIATE_FUTURE; } private synchronized void store(final Connection conn) throws SQLException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 123ecd8145..ea9621ff41 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -36,14 +36,14 @@ public interface DurableConfigurationStore public static interface Source { - DurableConfigurationStore getDurableConfigurationStore(); + DurableConfigurationStore getMessageStore(); } /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. * - * @param name The name to be used by this storem + * @param name The name to be used by this store * @param recoveryHandler Handler to be called as the store recovers on start up * @param config The apache commons configuration object. * @@ -89,7 +89,7 @@ public interface DurableConfigurationStore * @param exchange The exchange to unbind from. * @param routingKey The routing key to unbind. * @param queue The queue to unbind. - * @param args Additonal parameters. + * @param args Additional parameters. * * @throws AMQStoreException If the operation fails for any reason. */ diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index b01e5aa954..b6a5e80640 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** A simple message store that stores the messages in a threadsafe structure in memory. */ -public class MemoryMessageStore implements MessageStore, DurableConfigurationStore +public class MemoryMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -71,7 +71,7 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto public StoreFuture commitTranAsync() throws AMQStoreException { - return IMMEDIATE_FUTURE; + return StoreFuture.IMMEDIATE_FUTURE; } public void abortTran() throws AMQStoreException @@ -98,8 +98,8 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config, LogSubject logSubject) throws Exception { if(_logSubject == null) { @@ -187,14 +187,6 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto } - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception - { - //To change body of implemented methods use File | Settings | File Templates. - } - public Transaction newTransaction() { return IN_MEMORY_TRANSACTION; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 00bb0449d6..2114472592 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -21,51 +21,29 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; -import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.EnqueableMessage; /** * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. * */ -public interface MessageStore +public interface MessageStore extends DurableConfigurationStore { - StoreFuture IMMEDIATE_FUTURE = new StoreFuture() - { - public boolean isComplete() - { - return true; - } - - public void waitForCompletion() - { - - } - }; - - /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. * - * @param name The name to be used by this storem - * @param recoveryHandler Handler to be called as the store recovers on start up + * @param name The name to be used by this store + * @param messageRecoveryHandler Handler to be called as the store recovers on start up + * @param tlogRecoveryHandler * @param config The apache commons configuration object. - * * @throws Exception If any error occurs that means the store is unable to configure itself. */ void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception; + MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config, LogSubject logSubject) throws Exception; - /** - * Called to close and cleanup any resources used by the message store. - * - * @throws Exception If the close fails. - */ - void close() throws Exception; public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData); @@ -78,79 +56,13 @@ public interface MessageStore */ boolean isPersistent(); - - - public static interface Transaction - { - /** - * Places a message onto a specified queue, in a given transactional context. - * - * - * - * @param queue The queue to place the message on. - * @param message - * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason. - */ - void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; - - /** - * Extracts a message from a specified queue, in a given transactional context. - * - * @param queue The queue to place the message on. - * @param message The message to dequeue. - * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. - */ - void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; - - - /** - * Commits all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void commitTran() throws AMQStoreException; - - /** - * Commits all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - StoreFuture commitTranAsync() throws AMQStoreException; - - /** - * Abandons all operations performed within a given transactional context. - * - * @throws AMQStoreException If the operation fails for any reason. - */ - void abortTran() throws AMQStoreException; - - - public static interface Record - { - TransactionLogResource getQueue(); - EnqueableMessage getMessage(); - } - - void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException; - - void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - throws AMQStoreException; - } - - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception; - Transaction newTransaction(); - - - public static interface StoreFuture - { - boolean isComplete(); - - void waitForCompletion(); - } + /** + * Called to close and cleanup any resources used by the message store. + * + * @throws Exception If the close fails. + */ + void close() throws Exception; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java new file mode 100644 index 0000000000..3e720d9de1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java @@ -0,0 +1,41 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store; + +public interface StoreFuture +{ + StoreFuture IMMEDIATE_FUTURE = new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + + boolean isComplete(); + + void waitForCompletion(); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index 144cc629bd..e7302270bb 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -122,9 +122,9 @@ public class StoredMemoryMessage implements StoredMessage return buf; } - public MessageStore.StoreFuture flushToStore() + public StoreFuture flushToStore() { - return MessageStore.IMMEDIATE_FUTURE; + return StoreFuture.IMMEDIATE_FUTURE; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java index d4a0381929..7909003855 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -34,7 +34,7 @@ public interface StoredMessage<M extends StorableMessageMetaData> ByteBuffer getContent(int offsetInMessage, int size); - MessageStore.StoreFuture flushToStore(); + StoreFuture flushToStore(); void remove(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java new file mode 100644 index 0000000000..ed6b89e373 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; + +public interface Transaction +{ + /** + * Places a message onto a specified queue, in a given transactional context. + * + * + * + * @param queue The queue to place the message on. + * @param message + * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason. + */ + void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; + + /** + * Extracts a message from a specified queue, in a given transactional context. + * + * @param queue The queue to place the message on. + * @param message The message to dequeue. + * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist. + */ + void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException; + + + /** + * Commits all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void commitTran() throws AMQStoreException; + + /** + * Commits all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + StoreFuture commitTranAsync() throws AMQStoreException; + + /** + * Abandons all operations performed within a given transactional context. + * + * @throws AMQStoreException If the operation fails for any reason. + */ + void abortTran() throws AMQStoreException; + + + public static interface Record + { + TransactionLogResource getQueue(); + EnqueableMessage getMessage(); + } + + void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException; + + void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues) + throws AMQStoreException; +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java index 48ca72718b..b92d5f3e9b 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java @@ -33,7 +33,7 @@ public interface TransactionLogRecoveryHandler public static interface DtxRecordRecoveryHandler { - void dtxRecord(long format, byte[] globalId, byte[] branchId, MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues); + void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues); void completeDtxRecordRecovery(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 462e880e5f..70239b0fee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -72,6 +72,7 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -975,17 +976,17 @@ public class ServerSession extends Session return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); } - public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } private static class AsyncCommand { - private final MessageStore.StoreFuture _future; + private final StoreFuture _future; private ServerTransaction.Action _action; - public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) { _future = future; _action = action; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index c94a476712..54b91a0ce2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.Subscription_0_10; @@ -126,7 +127,7 @@ public class ServerSessionDelegate extends SessionDelegate serverSession.accept(method.getTransfers()); if(!serverSession.isTransactional()) { - serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, + serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, method)); } } @@ -356,7 +357,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); + serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); } } @@ -768,7 +769,7 @@ public class ServerSessionDelegate extends SessionDelegate { if (exchange.isDurable()) { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); store.createExchange(exchange); } @@ -924,7 +925,7 @@ public class ServerSessionDelegate extends SessionDelegate if (exchange.isDurable() && !exchange.isAutoDelete()) { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); store.removeExchange(exchange); } } @@ -1205,7 +1206,7 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); String queueName = method.getQueue(); AMQQueue queue; @@ -1448,7 +1449,7 @@ public class ServerSessionDelegate extends SessionDelegate queue.delete(); if (queue.isDurable() && !queue.isAutoDelete()) { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); + DurableConfigurationStore store = virtualHost.getMessageStore(); store.removeQueue(queue); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index a062c6732f..d446434d24 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -29,7 +29,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStore.StoreFuture; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; import java.util.Collection; import java.util.List; @@ -71,16 +72,16 @@ public class AsyncAutoCommitTransaction implements ServerTransaction */ public void addPostTransactionAction(final Action immediateAction) { - addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction); + addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction); } public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { - MessageStore.StoreFuture future; + StoreFuture future; if(message.isPersistent() && queue.isDurable()) { if (_logger.isDebugEnabled()) @@ -96,7 +97,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = MessageStore.IMMEDIATE_FUTURE; + future = StoreFuture.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -113,7 +114,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - private void addFuture(final MessageStore.StoreFuture future, final Action action) + private void addFuture(final StoreFuture future, final Action action) { if(action != null) { @@ -130,7 +131,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { for(QueueEntry entry : queueEntries) @@ -154,7 +155,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - MessageStore.StoreFuture future; + StoreFuture future; if(txn != null) { future = txn.commitTranAsync(); @@ -162,7 +163,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = MessageStore.IMMEDIATE_FUTURE; + future = StoreFuture.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -182,10 +183,10 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { - MessageStore.StoreFuture future; + StoreFuture future; if(message.isPersistent() && queue.isDurable()) { if (_logger.isDebugEnabled()) @@ -200,7 +201,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = MessageStore.IMMEDIATE_FUTURE; + future = StoreFuture.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -220,7 +221,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { @@ -246,7 +247,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - MessageStore.StoreFuture future; + StoreFuture future; if (txn != null) { future = txn.commitTranAsync(); @@ -254,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = MessageStore.IMMEDIATE_FUTURE; + future = StoreFuture.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -278,7 +279,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { if(immediatePostTransactionAction != null) { - addFuture(MessageStore.IMMEDIATE_FUTURE, new Action() + addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action() { public void postCommit() { @@ -305,7 +306,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction return false; } - private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) + private void rollbackIfNecessary(Action postTransactionAction, Transaction txn) { if (txn != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index 597797b5f8..e5a7df6880 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; import java.util.Collection; import java.util.List; @@ -67,7 +68,7 @@ public class AutoCommitTransaction implements ServerTransaction public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) @@ -99,7 +100,7 @@ public class AutoCommitTransaction implements ServerTransaction public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { for(QueueEntry entry : queueEntries) @@ -146,7 +147,7 @@ public class AutoCommitTransaction implements ServerTransaction public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) @@ -179,7 +180,7 @@ public class AutoCommitTransaction implements ServerTransaction public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { - MessageStore.Transaction txn = null; + Transaction txn = null; try { @@ -247,7 +248,7 @@ public class AutoCommitTransaction implements ServerTransaction return false; } - private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) + private void rollbackIfNecessary(Action postTransactionAction, Transaction txn) { if (txn != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index 36f5f7b58f..05d0110e9b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Xid; @@ -38,7 +39,7 @@ public class DistributedTransaction implements ServerTransaction private final AutoCommitTransaction _autoCommitTransaction; - private volatile MessageStore.Transaction _transaction; + private volatile Transaction _transaction; private long _txnStartTime = 0L; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index 99bb639261..3ac71fc6a6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Xid; @@ -48,7 +49,7 @@ public class DtxBranch private final List<Record> _enqueueRecords = new ArrayList<Record>(); private final List<Record> _dequeueRecords = new ArrayList<Record>(); - private MessageStore.Transaction _transaction; + private Transaction _transaction; private long _expiration; private VirtualHost _vhost; private ScheduledFuture<?> _timeoutFuture; @@ -199,7 +200,7 @@ public class DtxBranch public void prepare() throws AMQStoreException { - MessageStore.Transaction txn = _store.newTransaction(); + Transaction txn = _store.newTransaction(); txn.recordXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId(), @@ -223,7 +224,7 @@ public class DtxBranch { // prepare has previously been called - MessageStore.Transaction txn = _store.newTransaction(); + Transaction txn = _store.newTransaction(); txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId()); txn.commitTran(); @@ -302,7 +303,7 @@ public class DtxBranch _enqueueRecords.add(new Record(queue, message)); } - private static final class Record implements MessageStore.Transaction.Record + private static final class Record implements Transaction.Record { private final BaseQueue _queue; private final EnqueableMessage _message; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 9b61f7543f..11401ebd65 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; import java.util.ArrayList; import java.util.Collection; @@ -46,7 +47,7 @@ public class LocalTransaction implements ServerTransaction private final List<Action> _postTransactionActions = new ArrayList<Action>(); - private volatile MessageStore.Transaction _transaction; + private volatile Transaction _transaction; private MessageStore _transactionLog; private long _txnStartTime = 0L; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 5e4a4f2db6..00c8d1ff27 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -57,8 +57,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo MessageStore getMessageStore(); - DurableConfigurationStore getDurableConfigurationStore(); - SecurityManager getSecurityManager(); void close(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 0e965472d5..1da5b8d0c7 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -23,9 +23,7 @@ package org.apache.qpid.server.virtualhost; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; @@ -52,6 +50,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.DtxBranch; @@ -74,11 +73,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class); - private final VirtualHost _virtualHost; private MessageStoreLogSubject _logSubject; - private List<ProcessAction> _actions; private MessageStore _store; @@ -201,8 +198,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } public void dtxRecord(long format, byte[] globalId, byte[] branchId, - MessageStore.Transaction.Record[] enqueues, - MessageStore.Transaction.Record[] dequeues) + Transaction.Record[] enqueues, + Transaction.Record[] dequeues) { Xid id = new Xid(format, globalId, branchId); DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); @@ -212,7 +209,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa branch = new DtxBranch(id, _store, _virtualHost); dtxRegistry.registerBranch(branch); } - for(MessageStore.Transaction.Record record : enqueues) + for(Transaction.Record record : enqueues) { final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); if(queue != null) @@ -272,7 +269,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } } - for(MessageStore.Transaction.Record record : dequeues) + for(Transaction.Record record : dequeues) { final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); if(queue != null) @@ -385,7 +382,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf) { - _actions = new ArrayList<ProcessAction>(); try { Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName); @@ -482,7 +478,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa else { _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); - MessageStore.Transaction txn = _store.newTransaction(); + Transaction txn = _store.newTransaction(); txn.dequeueMessage(queue, new DummyMessage(messageId)); txn.commitTranAsync(); } @@ -490,7 +486,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa else { _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); - MessageStore.Transaction txn = _store.newTransaction(); + Transaction txn = _store.newTransaction(); TransactionLogResource mockQueue = new TransactionLogResource() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index fcad1550e1..eccaf553cd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -63,8 +63,12 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; @@ -118,15 +122,13 @@ public class VirtualHostImpl implements VirtualHost private AMQBrokerManagerMBean _brokerMBean; - - private DurableConfigurationStore _durableConfigurationStore; private BindingFactory _bindingFactory; private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception + public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception { if (hostConfig == null) { @@ -166,7 +168,7 @@ public class VirtualHostImpl implements VirtualHost StartupRoutingTable configFileRT = new StartupRoutingTable(); - _durableConfigurationStore = configFileRT; + _messageStore = configFileRT; // This needs to be after the RT has been defined as it creates the default durable exchanges. _exchangeRegistry.initialise(); @@ -175,18 +177,7 @@ public class VirtualHostImpl implements VirtualHost initialiseModel(_configuration); - if (store != null) - { - _messageStore = store; - if(store instanceof DurableConfigurationStore) - { - _durableConfigurationStore = (DurableConfigurationStore) store; - } - } - else - { - initialiseMessageStore(hostConfig); - } + initialiseMessageStore(hostConfig); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); @@ -228,8 +219,8 @@ public class VirtualHostImpl implements VirtualHost /** * Virtual host JMX MBean class. * - * This has some of the methods implemented from management intrerface for exchanges. Any - * implementaion of an Exchange MBean should extend this class. + * This has some of the methods implemented from management interface for exchanges. Any + * Implementation of an Exchange MBean should extend this class. */ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost { @@ -407,27 +398,15 @@ public class VirtualHostImpl implements VirtualHost MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore); - - if(messageStore instanceof DurableConfigurationStore) - { - DurableConfigurationStore durableConfigurationStore = (DurableConfigurationStore) messageStore; - - durableConfigurationStore.configureConfigStore(this.getName(), - recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); - - _durableConfigurationStore = durableConfigurationStore; - } + messageStore.configureConfigStore(this.getName(), + recoveryHandler, + hostConfig.getStoreConfiguration(), + storeLogSubject); messageStore.configureMessageStore(this.getName(), recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); - messageStore.configureTransactionLog(this.getName(), recoveryHandler, - hostConfig.getStoreConfiguration(), - storeLogSubject); + hostConfig.getStoreConfiguration(), storeLogSubject); _messageStore = messageStore; @@ -472,7 +451,7 @@ public class VirtualHostImpl implements VirtualHost if (newExchange.isDurable()) { - _durableConfigurationStore.createExchange(newExchange); + _messageStore.createExchange(newExchange); } } } @@ -482,10 +461,10 @@ public class VirtualHostImpl implements VirtualHost AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); String queueName = queue.getName(); - if (queue.isDurable()) - { - getDurableConfigurationStore().createQueue(queue); - } + if (queue.isDurable()) + { + getMessageStore().createQueue(queue); + } //get the exchange name (returns default exchange name if none was specified) String exchangeName = queueConfiguration.getExchange(); @@ -573,11 +552,6 @@ public class VirtualHostImpl implements VirtualHost return _messageStore; } - public DurableConfigurationStore getDurableConfigurationStore() - { - return _durableConfigurationStore; - } - public SecurityManager getSecurityManager() { return _securityManager; @@ -800,7 +774,7 @@ public class VirtualHostImpl implements VirtualHost * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded. * This should be removed after the _RT has been fully split from the the TL */ - private static class StartupRoutingTable implements DurableConfigurationStore + private static class StartupRoutingTable implements MessageStore { public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, @@ -856,6 +830,37 @@ public class VirtualHostImpl implements VirtualHost public void deleteBridge(final Bridge bridge) throws AMQStoreException { } + + @Override + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config, LogSubject logSubject) throws Exception + { + } + + @Override + public void close() throws Exception + { + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage( + T metaData) + { + return null; + } + + @Override + public boolean isPersistent() + { + return false; + } + + @Override + public Transaction newTransaction() + { + return null; + } } @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 7c7645e9e6..488f251b0a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -79,7 +79,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase private BindingFactory bindingFactory = new BindingFactory(new DurableConfigurationStore.Source() { - public DurableConfigurationStore getDurableConfigurationStore() + public DurableConfigurationStore getMessageStore() { return _store; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java index fba3851507..950d59bef5 100755 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import java.nio.ByteBuffer; @@ -105,9 +105,9 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> return buf; } - public MessageStore.StoreFuture flushToStore() + public StoreFuture flushToStore() { - return MessageStore.IMMEDIATE_FUTURE; + return StoreFuture.IMMEDIATE_FUTURE; } public void remove() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 79c744902d..a8676bf4c2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -61,7 +61,6 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase protected SimpleAMQQueue _queue; protected VirtualHost _virtualHost; - protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore(); protected AMQShortString _qname = new AMQShortString("qname"); protected AMQShortString _owner = new AMQShortString("owner"); protected AMQShortString _routingKey = new AMQShortString("routing key"); @@ -106,7 +105,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(); PropertiesConfiguration env = new PropertiesConfiguration(); - _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), new VirtualHostConfiguration(getClass().getName(), env), _store); + VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(getClass().getName(), env); + vHostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName()); + _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vHostConfig); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments); @@ -634,11 +635,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase qs.add(_queue); MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis()); - StoredMessage handle = _store.addMessage(metaData); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore(); + StoredMessage handle = store.addMessage(metaData); msg.setStoredMessage(handle); - ServerTransaction txn = new AutoCommitTransaction(_store); + ServerTransaction txn = new AutoCommitTransaction(store); txn.enqueue(qs, msg, new ServerTransaction.Action() { @@ -653,7 +655,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase }, 0L); // Check that it is enqueued - AMQQueue data = _store.getMessages().get(1L); + AMQQueue data = store.getMessages().get(1L); assertNull(data); // Dequeue message @@ -664,7 +666,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.dequeue(entry,null); // Check that it is dequeued - data = _store.getMessages().get(1L); + data = store.getMessages().get(1L); assertNull(data); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index d49f0586ba..755d61a260 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -298,7 +298,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase 1, queueRegistry.getQueues().size()); //test that removing the queue means it is not recovered next time - getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName)); + getVirtualHost().getMessageStore().removeQueue(queueRegistry.getQueue(durableQueueName)); reloadVirtualHost(); @@ -351,7 +351,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); //test that removing the exchange means it is not recovered next time - getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); + getVirtualHost().getMessageStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); reloadVirtualHost(); @@ -707,7 +707,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (queue.isDurable() && !queue.isAutoDelete()) { - getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments); + getVirtualHost().getMessageStore().createQueue(queue, queueArguments); } } catch (AMQException e) @@ -751,7 +751,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase getVirtualHost().getExchangeRegistry().registerExchange(exchange); if (durable) { - getVirtualHost().getDurableConfigurationStore().createExchange(exchange); + getVirtualHost().getMessageStore().createExchange(exchange); } } catch (AMQException e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 09d865cb05..38d3fb78fc 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -26,6 +26,8 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; @@ -45,8 +47,8 @@ public class SkeletonMessageStore implements MessageStore public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config, LogSubject logSubject) throws Exception { } @@ -98,14 +100,6 @@ public class SkeletonMessageStore implements MessageStore } - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, - LogSubject logSubject) throws Exception - { - - } - public Transaction newTransaction() { return new Transaction() @@ -162,4 +156,24 @@ public class SkeletonMessageStore implements MessageStore } + @Override + public void createBrokerLink(BrokerLink link) throws AMQStoreException + { + } + + @Override + public void deleteBrokerLink(BrokerLink link) throws AMQStoreException + { + } + + @Override + public void createBridge(Bridge bridge) throws AMQStoreException + { + } + + @Override + public void deleteBridge(Bridge bridge) throws AMQStoreException + { + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index 801549e561..e9b7ceacc5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -24,14 +24,21 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStore.StoreFuture; -import org.apache.qpid.server.store.MessageStore.Transaction; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; @@ -126,30 +133,23 @@ class MockStoreTransaction implements Transaction { public void configureMessageStore(final String name, final MessageStoreRecoveryHandler recoveryHandler, - final Configuration config, - final LogSubject logSubject) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler, + final Configuration config, final LogSubject logSubject) throws Exception { - //TODO. } public void close() throws Exception { - //TODO. } public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) { - return null; //TODO. + return null; } public boolean isPersistent() { - return false; //TODO. - } - - public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, LogSubject logSubject) throws Exception - { + return false; } public Transaction newTransaction() @@ -157,6 +157,82 @@ class MockStoreTransaction implements Transaction storeTransaction.setState(TransactionState.STARTED); return storeTransaction; } + + @Override + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + Configuration config, LogSubject logSubject) + throws Exception + { + } + + @Override + public void createExchange(Exchange exchange) + throws AMQStoreException + { + } + + @Override + public void removeExchange(Exchange exchange) + throws AMQStoreException + { + } + + @Override + public void bindQueue(Exchange exchange, AMQShortString routingKey, + AMQQueue queue, FieldTable args) throws AMQStoreException + { + } + + @Override + public void unbindQueue(Exchange exchange, + AMQShortString routingKey, AMQQueue queue, FieldTable args) + throws AMQStoreException + { + } + + @Override + public void createQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void createQueue(AMQQueue queue, FieldTable arguments) + throws AMQStoreException + { + } + + @Override + public void removeQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void updateQueue(AMQQueue queue) throws AMQStoreException + { + } + + @Override + public void createBrokerLink(BrokerLink link) + throws AMQStoreException + { + } + + @Override + public void deleteBrokerLink(BrokerLink link) + throws AMQStoreException + { + } + + @Override + public void createBridge(Bridge bridge) throws AMQStoreException + { + } + + @Override + public void deleteBridge(Bridge bridge) throws AMQStoreException + { + } }; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index af742532e2..cccf02c9f3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -38,7 +38,6 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; @@ -111,11 +110,6 @@ public class MockVirtualHost implements VirtualHost return null; } - public DurableConfigurationStore getDurableConfigurationStore() - { - return null; - } - public ExchangeFactory getExchangeFactory() { return null; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 8ffc09930e..c3b006f371 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -38,7 +38,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; -public class SlowMessageStore implements MessageStore, DurableConfigurationStore +public class SlowMessageStore implements MessageStore { private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); private static final String DELAYS = "delays"; @@ -160,11 +160,11 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - Configuration config, - LogSubject logSubject) throws Exception + MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config, LogSubject logSubject) throws Exception { - _realStore.configureMessageStore(name, recoveryHandler, config, logSubject); + _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config, logSubject); } public void close() throws Exception @@ -227,14 +227,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("removeQueue"); } - public void configureTransactionLog(String name, - TransactionLogRecoveryHandler recoveryHandler, - Configuration storeConfiguration, LogSubject logSubject) - throws Exception - { - _realStore.configureTransactionLog(name, recoveryHandler, storeConfiguration, logSubject); - } - public Transaction newTransaction() { doPreDelay("beginTran"); |