summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-03-30 08:55:05 +0000
committerKeith Wall <kwall@apache.org>2012-03-30 08:55:05 +0000
commitda8070494a06d0b6c37127eb0a3439e394bddd31 (patch)
tree85ca23ccf051f3157a5f4b3be1b7752dfe576c6d
parent6e605c53e91d5b9b1bf46985c5cf8bd94a34de4d (diff)
downloadqpid-python-da8070494a06d0b6c37127eb0a3439e394bddd31.tar.gz
QPID-3916: Change message store interface to extend DurableConfigurationStore and change VirtualHost contructor
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307317 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java33
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java12
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java16
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java7
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java37
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java114
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java41
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java4
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java81
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java13
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java35
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java9
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java3
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java99
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rwxr-xr-xqpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java34
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java102
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java18
45 files changed, 437 insertions, 371 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 589f63f562..402df299fc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -59,8 +59,10 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -107,7 +109,7 @@ import com.sleepycat.je.TransactionConfig;
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
@SuppressWarnings({"unchecked"})
-public class BDBMessageStore implements MessageStore, DurableConfigurationStore
+public class BDBMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
@@ -217,8 +219,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
CurrentActor.get().message(logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
@@ -231,29 +233,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
recoverMessages(recoveryHandler);
- }
- public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration, LogSubject logSubject) throws Exception
- {
CurrentActor.get().message(logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
-
- if(!_configured)
- {
- _logSubject = logSubject;
- configure(name,storeConfiguration);
- _configured = true;
- stateTransition(State.CONFIGURING, State.CONFIGURED);
- }
-
- recoverQueueEntries(recoveryHandler);
-
-
-
+ recoverQueueEntries(tlogRecoveryHandler);
}
- public org.apache.qpid.server.store.MessageStore.Transaction newTransaction()
+
+ public org.apache.qpid.server.store.Transaction newTransaction()
{
return new BDBTransaction();
}
@@ -2222,7 +2209,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
BDBMessageStore.this.commit(txn,true);
}
- return IMMEDIATE_FUTURE;
+ return StoreFuture.IMMEDIATE_FUTURE;
}
public void remove()
@@ -2238,7 +2225,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
- private class BDBTransaction implements Transaction
+ private class BDBTransaction implements org.apache.qpid.server.store.Transaction
{
private com.sleepycat.je.Transaction _txn;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
index 11ae8b89eb..eb5c4677ff 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
@@ -21,25 +21,25 @@
package org.apache.qpid.server.store.berkeleydb.entry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
public class PreparedTransaction
{
- private final MessageStore.Transaction.Record[] _enqueues;
- private final MessageStore.Transaction.Record[] _dequeues;
+ private final Transaction.Record[] _enqueues;
+ private final Transaction.Record[] _dequeues;
- public PreparedTransaction(MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues)
+ public PreparedTransaction(Transaction.Record[] enqueues, Transaction.Record[] dequeues)
{
_enqueues = enqueues;
_dequeues = dequeues;
}
- public MessageStore.Transaction.Record[] getEnqueues()
+ public Transaction.Record[] getEnqueues()
{
return _enqueues;
}
- public MessageStore.Transaction.Record[] getDequeues()
+ public Transaction.Record[] getDequeues()
{
return _dequeues;
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index d85bcd361e..33bf269880 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -25,8 +25,8 @@ import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
@@ -35,16 +35,16 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
@Override
public PreparedTransaction entryToObject(TupleInput input)
{
- MessageStore.Transaction.Record[] enqueues = readRecords(input);
+ Transaction.Record[] enqueues = readRecords(input);
- MessageStore.Transaction.Record[] dequeues = readRecords(input);
+ Transaction.Record[] dequeues = readRecords(input);
return new PreparedTransaction(enqueues, dequeues);
}
- private MessageStore.Transaction.Record[] readRecords(TupleInput input)
+ private Transaction.Record[] readRecords(TupleInput input)
{
- MessageStore.Transaction.Record[] records = new MessageStore.Transaction.Record[input.readInt()];
+ Transaction.Record[] records = new Transaction.Record[input.readInt()];
for(int i = 0; i < records.length; i++)
{
records[i] = new RecordImpl(input.readString(), input.readLong());
@@ -60,7 +60,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
- private void writeRecords(MessageStore.Transaction.Record[] records, TupleOutput output)
+ private void writeRecords(Transaction.Record[] records, TupleOutput output)
{
if(records == null)
{
@@ -69,7 +69,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
else
{
output.writeInt(records.length);
- for(MessageStore.Transaction.Record record : records)
+ for(Transaction.Record record : records)
{
output.writeString(record.getQueue().getResourceName());
output.writeLong(record.getMessage().getMessageNumber());
@@ -77,7 +77,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
}
- private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage
+ private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage
{
private final String _queueName;
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 74fba168a9..0ccfbe5a2a 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -419,7 +420,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
}
};
- MessageStore.Transaction txn = log.newTransaction();
+ Transaction txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(1L));
txn.enqueueMessage(mockQueue, new MockMessage(5L));
@@ -457,7 +458,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
}
};
- MessageStore.Transaction txn = log.newTransaction();
+ Transaction txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(21L));
txn.abortTran();
@@ -498,7 +499,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
}
};
- MessageStore.Transaction txn = log.newTransaction();
+ Transaction txn = log.newTransaction();
txn.enqueueMessage(mockQueue, new MockMessage(30L));
txn.commitTran();
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index b906ab2474..4e201d5473 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 265aa7714e..b388def86c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -76,7 +76,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
_queueRegistry = virtualHost.getQueueRegistry();
_exchangeRegistry = virtualHost.getExchangeRegistry();
_defaultExchange = _exchangeRegistry.getDefaultExchange();
- _durableConfig = virtualHost.getDurableConfigurationStore();
+ _durableConfig = virtualHost.getMessageStore();
_exchangeFactory = virtualHost.getExchangeFactory();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 22f9544b0c..6910247577 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -83,6 +83,7 @@ import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
@@ -1556,7 +1557,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
@@ -1590,10 +1591,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private static class AsyncCommand
{
- private final MessageStore.StoreFuture _future;
+ private final StoreFuture _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
index fe66a6d341..250c417ef1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
@@ -192,7 +192,7 @@ public class BindingFactory
if (b.isDurable() && !restore)
{
- _configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
+ _configSource.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
}
queue.addQueueDeleteTask(b);
@@ -265,7 +265,7 @@ public class BindingFactory
if (b.isDurable())
{
- _configSource.getDurableConfigurationStore().unbindQueue(exchange,
+ _configSource.getMessageStore().unbindQueue(exchange,
new AMQShortString(bindingKey),
queue,
FieldTable.convertToFieldTable(arguments));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index a5fa9f014e..ebe0645bc4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -63,7 +63,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public DurableConfigurationStore getDurableConfigurationStore()
{
- return _host.getDurableConfigurationStore();
+ return _host.getMessageStore();
}
public void registerExchange(Exchange exchange) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
index b58802e1ff..4b4bdd4efb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
@@ -132,7 +132,7 @@ public class Bridge implements BridgeConfig
{
try
{
- brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this);
+ brokerLink.getVirtualHost().getMessageStore().createBridge(this);
}
catch (AMQStoreException e)
{
@@ -220,7 +220,7 @@ public class Bridge implements BridgeConfig
{
try
{
- brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this);
+ brokerLink.getVirtualHost().getMessageStore().createBridge(this);
}
catch (AMQStoreException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
index 032df8bb0d..4bcc0d6136 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
@@ -250,7 +250,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
try
{
- _virtualHost.getDurableConfigurationStore().createBrokerLink(this);
+ _virtualHost.getMessageStore().createBrokerLink(this);
}
catch (AMQStoreException e)
{
@@ -295,7 +295,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
try
{
- _virtualHost.getDurableConfigurationStore().createBrokerLink(this);
+ _virtualHost.getMessageStore().createBrokerLink(this);
}
catch (AMQStoreException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 6d55f31ebc..8756409f64 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -106,7 +106,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
if (exchange.isDurable())
{
- virtualHost.getDurableConfigurationStore().createExchange(exchange);
+ virtualHost.getMessageStore().createExchange(exchange);
}
}
catch(AMQUnknownExchangeType e)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index f57f7eb9e6..7d993ae14d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -65,7 +65,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ DurableConfigurationStore store = virtualHost.getMessageStore();
final AMQShortString queueName;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index cc37259d54..762f090b83 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -62,7 +62,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ DurableConfigurationStore store = virtualHost.getMessageStore();
AMQChannel channel = protocolConnection.getChannel(channelId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index aca5891d2e..8ff3f0148b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -292,7 +292,7 @@ public class AMQQueueFactory
exchangeRegistry.registerExchange(dlExchange);
//enter the dle in the persistent store
- virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+ virtualHost.getMessageStore().createExchange(dlExchange);
}
}
@@ -312,7 +312,7 @@ public class AMQQueueFactory
dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
//enter the dlq in the persistent store
- virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+ virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index cc1041d9de..1cd7e3505f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -346,7 +346,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if(isDurable())
{
- getVirtualHost().getDurableConfigurationStore().updateQueue(this);
+ getVirtualHost().getMessageStore().updateQueue(this);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 9951f7d3c8..4ed28b965d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -664,7 +664,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
{
- VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig, null);
+ VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
_virtualHostRegistry.registerVirtualHost(virtualHost);
getBroker().addVirtualHost(virtualHost);
return virtualHost;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
index fb67500da9..73f127e097 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -60,13 +60,4 @@ public interface ConfigurationRecoveryHandler
void completeBridgeRecoveryForLink();
}
- public static interface QueueEntryRecoveryHandler
- {
- void complete();
-
- void queueEntry(String queueName, long messageId);
- }
-
-
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 4d63136a9d..86304a0984 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -69,7 +69,7 @@ import java.util.concurrent.atomic.AtomicLong;
*
* TODO extract the SQL statements into a generic JDBC store
*/
-public class DerbyMessageStore implements MessageStore, DurableConfigurationStore
+public class DerbyMessageStore implements MessageStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -277,8 +277,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
if(!_configured)
{
@@ -297,38 +297,13 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
recoverMessages(recoveryHandler);
- }
-
-
-
- public void configureTransactionLog(String name,
- TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
- {
-
- if(!_configured)
- {
- _logSubject = logSubject;
- }
CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
- if(!_configured)
- {
-
- _logSubject = logSubject;
-
- commonConfiguration(name, storeConfiguration, logSubject);
- _configured = true;
- }
-
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(recoveryHandler);
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(tlogRecoveryHandler);
recoverXids(dtxrh);
}
-
-
private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject)
throws ClassNotFoundException, SQLException
{
@@ -2167,7 +2142,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
}
- private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage
+ private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage
{
private final String _queueName;
@@ -2650,7 +2625,7 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
throw new RuntimeException(e);
}
- return IMMEDIATE_FUTURE;
+ return StoreFuture.IMMEDIATE_FUTURE;
}
private synchronized void store(final Connection conn) throws SQLException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 123ecd8145..ea9621ff41 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -36,14 +36,14 @@ public interface DurableConfigurationStore
public static interface Source
{
- DurableConfigurationStore getDurableConfigurationStore();
+ DurableConfigurationStore getMessageStore();
}
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
*
- * @param name The name to be used by this storem
+ * @param name The name to be used by this store
* @param recoveryHandler Handler to be called as the store recovers on start up
* @param config The apache commons configuration object.
*
@@ -89,7 +89,7 @@ public interface DurableConfigurationStore
* @param exchange The exchange to unbind from.
* @param routingKey The routing key to unbind.
* @param queue The queue to unbind.
- * @param args Additonal parameters.
+ * @param args Additional parameters.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index b01e5aa954..b6a5e80640 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore, DurableConfigurationStore
+public class MemoryMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -71,7 +71,7 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto
public StoreFuture commitTranAsync() throws AMQStoreException
{
- return IMMEDIATE_FUTURE;
+ return StoreFuture.IMMEDIATE_FUTURE;
}
public void abortTran() throws AMQStoreException
@@ -98,8 +98,8 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config, LogSubject logSubject) throws Exception
{
if(_logSubject == null)
{
@@ -187,14 +187,6 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto
}
- public void configureTransactionLog(String name,
- TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
public Transaction newTransaction()
{
return IN_MEMORY_TRANSACTION;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 00bb0449d6..2114472592 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -21,51 +21,29 @@
package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.EnqueableMessage;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
*/
-public interface MessageStore
+public interface MessageStore extends DurableConfigurationStore
{
- StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
- {
- public boolean isComplete()
- {
- return true;
- }
-
- public void waitForCompletion()
- {
-
- }
- };
-
-
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
*
- * @param name The name to be used by this storem
- * @param recoveryHandler Handler to be called as the store recovers on start up
+ * @param name The name to be used by this store
+ * @param messageRecoveryHandler Handler to be called as the store recovers on start up
+ * @param tlogRecoveryHandler
* @param config The apache commons configuration object.
- *
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception;
+ MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config, LogSubject logSubject) throws Exception;
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
- */
- void close() throws Exception;
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
@@ -78,79 +56,13 @@ public interface MessageStore
*/
boolean isPersistent();
-
-
- public static interface Transaction
- {
- /**
- * Places a message onto a specified queue, in a given transactional context.
- *
- *
- *
- * @param queue The queue to place the message on.
- * @param message
- * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
- */
- void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
-
- /**
- * Extracts a message from a specified queue, in a given transactional context.
- *
- * @param queue The queue to place the message on.
- * @param message The message to dequeue.
- * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
- */
- void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
-
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void commitTran() throws AMQStoreException;
-
- /**
- * Commits all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- StoreFuture commitTranAsync() throws AMQStoreException;
-
- /**
- * Abandons all operations performed within a given transactional context.
- *
- * @throws AMQStoreException If the operation fails for any reason.
- */
- void abortTran() throws AMQStoreException;
-
-
- public static interface Record
- {
- TransactionLogResource getQueue();
- EnqueableMessage getMessage();
- }
-
- void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException;
-
- void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- throws AMQStoreException;
- }
-
- public void configureTransactionLog(String name,
- TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception;
-
Transaction newTransaction();
-
-
- public static interface StoreFuture
- {
- boolean isComplete();
-
- void waitForCompletion();
- }
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws Exception If the close fails.
+ */
+ void close() throws Exception;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java
new file mode 100644
index 0000000000..3e720d9de1
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreFuture.java
@@ -0,0 +1,41 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+public interface StoreFuture
+{
+ StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+
+ boolean isComplete();
+
+ void waitForCompletion();
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 144cc629bd..e7302270bb 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -122,9 +122,9 @@ public class StoredMemoryMessage implements StoredMessage
return buf;
}
- public MessageStore.StoreFuture flushToStore()
+ public StoreFuture flushToStore()
{
- return MessageStore.IMMEDIATE_FUTURE;
+ return StoreFuture.IMMEDIATE_FUTURE;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index d4a0381929..7909003855 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -34,7 +34,7 @@ public interface StoredMessage<M extends StorableMessageMetaData>
ByteBuffer getContent(int offsetInMessage, int size);
- MessageStore.StoreFuture flushToStore();
+ StoreFuture flushToStore();
void remove();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java
new file mode 100644
index 0000000000..ed6b89e373
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Transaction.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+
+public interface Transaction
+{
+ /**
+ * Places a message onto a specified queue, in a given transactional context.
+ *
+ *
+ *
+ * @param queue The queue to place the message on.
+ * @param message
+ * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
+ */
+ void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+ /**
+ * Extracts a message from a specified queue, in a given transactional context.
+ *
+ * @param queue The queue to place the message on.
+ * @param message The message to dequeue.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void commitTran() throws AMQStoreException;
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ StoreFuture commitTranAsync() throws AMQStoreException;
+
+ /**
+ * Abandons all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void abortTran() throws AMQStoreException;
+
+
+ public static interface Record
+ {
+ TransactionLogResource getQueue();
+ EnqueableMessage getMessage();
+ }
+
+ void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException;
+
+ void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues)
+ throws AMQStoreException;
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
index 48ca72718b..b92d5f3e9b 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
@@ -33,7 +33,7 @@ public interface TransactionLogRecoveryHandler
public static interface DtxRecordRecoveryHandler
{
- void dtxRecord(long format, byte[] globalId, byte[] branchId, MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues);
+ void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues);
void completeDtxRecordRecovery();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 462e880e5f..70239b0fee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -72,6 +72,7 @@ import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -975,17 +976,17 @@ public class ServerSession extends Session
return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
}
- public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
private static class AsyncCommand
{
- private final MessageStore.StoreFuture _future;
+ private final StoreFuture _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+ public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index c94a476712..54b91a0ce2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
@@ -126,7 +127,7 @@ public class ServerSessionDelegate extends SessionDelegate
serverSession.accept(method.getTransfers());
if(!serverSession.isTransactional())
{
- serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE,
+ serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
new CommandProcessedAction(serverSession, method));
}
}
@@ -356,7 +357,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
+ serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
}
}
@@ -768,7 +769,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
if (exchange.isDurable())
{
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ DurableConfigurationStore store = virtualHost.getMessageStore();
store.createExchange(exchange);
}
@@ -924,7 +925,7 @@ public class ServerSessionDelegate extends SessionDelegate
if (exchange.isDurable() && !exchange.isAutoDelete())
{
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ DurableConfigurationStore store = virtualHost.getMessageStore();
store.removeExchange(exchange);
}
}
@@ -1205,7 +1206,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
VirtualHost virtualHost = getVirtualHost(session);
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ DurableConfigurationStore store = virtualHost.getMessageStore();
String queueName = method.getQueue();
AMQQueue queue;
@@ -1448,7 +1449,7 @@ public class ServerSessionDelegate extends SessionDelegate
queue.delete();
if (queue.isDurable() && !queue.isAutoDelete())
{
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ DurableConfigurationStore store = virtualHost.getMessageStore();
store.removeQueue(queue);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index a062c6732f..d446434d24 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -29,7 +29,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStore.StoreFuture;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.Transaction;
import java.util.Collection;
import java.util.List;
@@ -71,16 +72,16 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
*/
public void addPostTransactionAction(final Action immediateAction)
{
- addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction);
+ addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction);
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- MessageStore.Transaction txn = null;
+ Transaction txn = null;
try
{
- MessageStore.StoreFuture future;
+ StoreFuture future;
if(message.isPersistent() && queue.isDurable())
{
if (_logger.isDebugEnabled())
@@ -96,7 +97,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = MessageStore.IMMEDIATE_FUTURE;
+ future = StoreFuture.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -113,7 +114,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- private void addFuture(final MessageStore.StoreFuture future, final Action action)
+ private void addFuture(final StoreFuture future, final Action action)
{
if(action != null)
{
@@ -130,7 +131,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
- MessageStore.Transaction txn = null;
+ Transaction txn = null;
try
{
for(QueueEntry entry : queueEntries)
@@ -154,7 +155,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- MessageStore.StoreFuture future;
+ StoreFuture future;
if(txn != null)
{
future = txn.commitTranAsync();
@@ -162,7 +163,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = MessageStore.IMMEDIATE_FUTURE;
+ future = StoreFuture.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -182,10 +183,10 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- MessageStore.Transaction txn = null;
+ Transaction txn = null;
try
{
- MessageStore.StoreFuture future;
+ StoreFuture future;
if(message.isPersistent() && queue.isDurable())
{
if (_logger.isDebugEnabled())
@@ -200,7 +201,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = MessageStore.IMMEDIATE_FUTURE;
+ future = StoreFuture.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -220,7 +221,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
- MessageStore.Transaction txn = null;
+ Transaction txn = null;
try
{
@@ -246,7 +247,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- MessageStore.StoreFuture future;
+ StoreFuture future;
if (txn != null)
{
future = txn.commitTranAsync();
@@ -254,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = MessageStore.IMMEDIATE_FUTURE;
+ future = StoreFuture.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -278,7 +279,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
if(immediatePostTransactionAction != null)
{
- addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+ addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action()
{
public void postCommit()
{
@@ -305,7 +306,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
return false;
}
- private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
+ private void rollbackIfNecessary(Action postTransactionAction, Transaction txn)
{
if (txn != null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index 597797b5f8..e5a7df6880 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
import java.util.Collection;
import java.util.List;
@@ -67,7 +68,7 @@ public class AutoCommitTransaction implements ServerTransaction
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- MessageStore.Transaction txn = null;
+ Transaction txn = null;
try
{
if(message.isPersistent() && queue.isDurable())
@@ -99,7 +100,7 @@ public class AutoCommitTransaction implements ServerTransaction
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
- MessageStore.Transaction txn = null;
+ Transaction txn = null;
try
{
for(QueueEntry entry : queueEntries)
@@ -146,7 +147,7 @@ public class AutoCommitTransaction implements ServerTransaction
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- MessageStore.Transaction txn = null;
+ Transaction txn = null;
try
{
if(message.isPersistent() && queue.isDurable())
@@ -179,7 +180,7 @@ public class AutoCommitTransaction implements ServerTransaction
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
- MessageStore.Transaction txn = null;
+ Transaction txn = null;
try
{
@@ -247,7 +248,7 @@ public class AutoCommitTransaction implements ServerTransaction
return false;
}
- private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
+ private void rollbackIfNecessary(Action postTransactionAction, Transaction txn)
{
if (txn != null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
index 36f5f7b58f..05d0110e9b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -38,7 +39,7 @@ public class DistributedTransaction implements ServerTransaction
private final AutoCommitTransaction _autoCommitTransaction;
- private volatile MessageStore.Transaction _transaction;
+ private volatile Transaction _transaction;
private long _txnStartTime = 0L;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
index 99bb639261..3ac71fc6a6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -48,7 +49,7 @@ public class DtxBranch
private final List<Record> _enqueueRecords = new ArrayList<Record>();
private final List<Record> _dequeueRecords = new ArrayList<Record>();
- private MessageStore.Transaction _transaction;
+ private Transaction _transaction;
private long _expiration;
private VirtualHost _vhost;
private ScheduledFuture<?> _timeoutFuture;
@@ -199,7 +200,7 @@ public class DtxBranch
public void prepare() throws AMQStoreException
{
- MessageStore.Transaction txn = _store.newTransaction();
+ Transaction txn = _store.newTransaction();
txn.recordXid(_xid.getFormat(),
_xid.getGlobalId(),
_xid.getBranchId(),
@@ -223,7 +224,7 @@ public class DtxBranch
{
// prepare has previously been called
- MessageStore.Transaction txn = _store.newTransaction();
+ Transaction txn = _store.newTransaction();
txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId());
txn.commitTran();
@@ -302,7 +303,7 @@ public class DtxBranch
_enqueueRecords.add(new Record(queue, message));
}
- private static final class Record implements MessageStore.Transaction.Record
+ private static final class Record implements Transaction.Record
{
private final BaseQueue _queue;
private final EnqueableMessage _message;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 9b61f7543f..11401ebd65 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
import java.util.ArrayList;
import java.util.Collection;
@@ -46,7 +47,7 @@ public class LocalTransaction implements ServerTransaction
private final List<Action> _postTransactionActions = new ArrayList<Action>();
- private volatile MessageStore.Transaction _transaction;
+ private volatile Transaction _transaction;
private MessageStore _transactionLog;
private long _txnStartTime = 0L;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 5e4a4f2db6..00c8d1ff27 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -57,8 +57,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
MessageStore getMessageStore();
- DurableConfigurationStore getDurableConfigurationStore();
-
SecurityManager getSecurityManager();
void close();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 0e965472d5..1da5b8d0c7 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -23,9 +23,7 @@ package org.apache.qpid.server.virtualhost;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
@@ -52,6 +50,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.DtxBranch;
@@ -74,11 +73,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
-
private final VirtualHost _virtualHost;
private MessageStoreLogSubject _logSubject;
- private List<ProcessAction> _actions;
private MessageStore _store;
@@ -201,8 +198,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
public void dtxRecord(long format, byte[] globalId, byte[] branchId,
- MessageStore.Transaction.Record[] enqueues,
- MessageStore.Transaction.Record[] dequeues)
+ Transaction.Record[] enqueues,
+ Transaction.Record[] dequeues)
{
Xid id = new Xid(format, globalId, branchId);
DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
@@ -212,7 +209,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
branch = new DtxBranch(id, _store, _virtualHost);
dtxRegistry.registerBranch(branch);
}
- for(MessageStore.Transaction.Record record : enqueues)
+ for(Transaction.Record record : enqueues)
{
final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
if(queue != null)
@@ -272,7 +269,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
}
- for(MessageStore.Transaction.Record record : dequeues)
+ for(Transaction.Record record : dequeues)
{
final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
if(queue != null)
@@ -385,7 +382,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
{
- _actions = new ArrayList<ProcessAction>();
try
{
Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
@@ -482,7 +478,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
else
{
_logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
- MessageStore.Transaction txn = _store.newTransaction();
+ Transaction txn = _store.newTransaction();
txn.dequeueMessage(queue, new DummyMessage(messageId));
txn.commitTranAsync();
}
@@ -490,7 +486,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
else
{
_logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
- MessageStore.Transaction txn = _store.newTransaction();
+ Transaction txn = _store.newTransaction();
TransactionLogResource mockQueue =
new TransactionLogResource()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index fcad1550e1..eccaf553cd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -63,8 +63,12 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
@@ -118,15 +122,13 @@ public class VirtualHostImpl implements VirtualHost
private AMQBrokerManagerMBean _brokerMBean;
-
- private DurableConfigurationStore _durableConfigurationStore;
private BindingFactory _bindingFactory;
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+ public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
{
if (hostConfig == null)
{
@@ -166,7 +168,7 @@ public class VirtualHostImpl implements VirtualHost
StartupRoutingTable configFileRT = new StartupRoutingTable();
- _durableConfigurationStore = configFileRT;
+ _messageStore = configFileRT;
// This needs to be after the RT has been defined as it creates the default durable exchanges.
_exchangeRegistry.initialise();
@@ -175,18 +177,7 @@ public class VirtualHostImpl implements VirtualHost
initialiseModel(_configuration);
- if (store != null)
- {
- _messageStore = store;
- if(store instanceof DurableConfigurationStore)
- {
- _durableConfigurationStore = (DurableConfigurationStore) store;
- }
- }
- else
- {
- initialiseMessageStore(hostConfig);
- }
+ initialiseMessageStore(hostConfig);
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
@@ -228,8 +219,8 @@ public class VirtualHostImpl implements VirtualHost
/**
* Virtual host JMX MBean class.
*
- * This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
+ * This has some of the methods implemented from management interface for exchanges. Any
+ * Implementation of an Exchange MBean should extend this class.
*/
public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
{
@@ -407,27 +398,15 @@ public class VirtualHostImpl implements VirtualHost
MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
-
- if(messageStore instanceof DurableConfigurationStore)
- {
- DurableConfigurationStore durableConfigurationStore = (DurableConfigurationStore) messageStore;
-
- durableConfigurationStore.configureConfigStore(this.getName(),
- recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
-
- _durableConfigurationStore = durableConfigurationStore;
- }
+ messageStore.configureConfigStore(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
messageStore.configureMessageStore(this.getName(),
recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
- messageStore.configureTransactionLog(this.getName(),
recoveryHandler,
- hostConfig.getStoreConfiguration(),
- storeLogSubject);
+ hostConfig.getStoreConfiguration(), storeLogSubject);
_messageStore = messageStore;
@@ -472,7 +451,7 @@ public class VirtualHostImpl implements VirtualHost
if (newExchange.isDurable())
{
- _durableConfigurationStore.createExchange(newExchange);
+ _messageStore.createExchange(newExchange);
}
}
}
@@ -482,10 +461,10 @@ public class VirtualHostImpl implements VirtualHost
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
String queueName = queue.getName();
- if (queue.isDurable())
- {
- getDurableConfigurationStore().createQueue(queue);
- }
+ if (queue.isDurable())
+ {
+ getMessageStore().createQueue(queue);
+ }
//get the exchange name (returns default exchange name if none was specified)
String exchangeName = queueConfiguration.getExchange();
@@ -573,11 +552,6 @@ public class VirtualHostImpl implements VirtualHost
return _messageStore;
}
- public DurableConfigurationStore getDurableConfigurationStore()
- {
- return _durableConfigurationStore;
- }
-
public SecurityManager getSecurityManager()
{
return _securityManager;
@@ -800,7 +774,7 @@ public class VirtualHostImpl implements VirtualHost
* This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
* This should be removed after the _RT has been fully split from the the TL
*/
- private static class StartupRoutingTable implements DurableConfigurationStore
+ private static class StartupRoutingTable implements MessageStore
{
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
@@ -856,6 +830,37 @@ public class VirtualHostImpl implements VirtualHost
public void deleteBridge(final Bridge bridge) throws AMQStoreException
{
}
+
+ @Override
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config, LogSubject logSubject) throws Exception
+ {
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(
+ T metaData)
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return null;
+ }
}
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 7c7645e9e6..488f251b0a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -79,7 +79,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
private BindingFactory bindingFactory = new BindingFactory(new DurableConfigurationStore.Source()
{
- public DurableConfigurationStore getDurableConfigurationStore()
+ public DurableConfigurationStore getMessageStore()
{
return _store;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
index fba3851507..950d59bef5 100755
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
@@ -25,7 +25,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import java.nio.ByteBuffer;
@@ -105,9 +105,9 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
return buf;
}
- public MessageStore.StoreFuture flushToStore()
+ public StoreFuture flushToStore()
{
- return MessageStore.IMMEDIATE_FUTURE;
+ return StoreFuture.IMMEDIATE_FUTURE;
}
public void remove()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 79c744902d..a8676bf4c2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -61,7 +61,6 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
protected SimpleAMQQueue _queue;
protected VirtualHost _virtualHost;
- protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -106,7 +105,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance();
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), new VirtualHostConfiguration(getClass().getName(), env), _store);
+ VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(getClass().getName(), env);
+ vHostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName());
+ _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vHostConfig);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments);
@@ -634,11 +635,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
qs.add(_queue);
MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis());
- StoredMessage handle = _store.addMessage(metaData);
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) _virtualHost.getMessageStore();
+ StoredMessage handle = store.addMessage(metaData);
msg.setStoredMessage(handle);
- ServerTransaction txn = new AutoCommitTransaction(_store);
+ ServerTransaction txn = new AutoCommitTransaction(store);
txn.enqueue(qs, msg, new ServerTransaction.Action()
{
@@ -653,7 +655,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}, 0L);
// Check that it is enqueued
- AMQQueue data = _store.getMessages().get(1L);
+ AMQQueue data = store.getMessages().get(1L);
assertNull(data);
// Dequeue message
@@ -664,7 +666,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.dequeue(entry,null);
// Check that it is dequeued
- data = _store.getMessages().get(1L);
+ data = store.getMessages().get(1L);
assertNull(data);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index d49f0586ba..755d61a260 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -298,7 +298,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
1, queueRegistry.getQueues().size());
//test that removing the queue means it is not recovered next time
- getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName));
+ getVirtualHost().getMessageStore().removeQueue(queueRegistry.getQueue(durableQueueName));
reloadVirtualHost();
@@ -351,7 +351,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
//test that removing the exchange means it is not recovered next time
- getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName));
+ getVirtualHost().getMessageStore().removeExchange(exchangeRegistry.getExchange(directExchangeName));
reloadVirtualHost();
@@ -707,7 +707,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
if (queue.isDurable() && !queue.isAutoDelete())
{
- getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments);
+ getVirtualHost().getMessageStore().createQueue(queue, queueArguments);
}
}
catch (AMQException e)
@@ -751,7 +751,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
getVirtualHost().getExchangeRegistry().registerExchange(exchange);
if (durable)
{
- getVirtualHost().getDurableConfigurationStore().createExchange(exchange);
+ getVirtualHost().getMessageStore().createExchange(exchange);
}
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 09d865cb05..38d3fb78fc 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -26,6 +26,8 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
@@ -45,8 +47,8 @@ public class SkeletonMessageStore implements MessageStore
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config, LogSubject logSubject) throws Exception
{
}
@@ -98,14 +100,6 @@ public class SkeletonMessageStore implements MessageStore
}
- public void configureTransactionLog(String name,
- TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration,
- LogSubject logSubject) throws Exception
- {
-
- }
-
public Transaction newTransaction()
{
return new Transaction()
@@ -162,4 +156,24 @@ public class SkeletonMessageStore implements MessageStore
}
+ @Override
+ public void createBrokerLink(BrokerLink link) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void deleteBrokerLink(BrokerLink link) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createBridge(Bridge bridge) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void deleteBridge(Bridge bridge) throws AMQStoreException
+ {
+ }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index 801549e561..e9b7ceacc5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -24,14 +24,21 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStore.StoreFuture;
-import org.apache.qpid.server.store.MessageStore.Transaction;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -126,30 +133,23 @@ class MockStoreTransaction implements Transaction
{
public void configureMessageStore(final String name,
final MessageStoreRecoveryHandler recoveryHandler,
- final Configuration config,
- final LogSubject logSubject) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ final Configuration config, final LogSubject logSubject) throws Exception
{
- //TODO.
}
public void close() throws Exception
{
- //TODO.
}
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
{
- return null; //TODO.
+ return null;
}
public boolean isPersistent()
{
- return false; //TODO.
- }
-
- public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration, LogSubject logSubject) throws Exception
- {
+ return false;
}
public Transaction newTransaction()
@@ -157,6 +157,82 @@ class MockStoreTransaction implements Transaction
storeTransaction.setState(TransactionState.STARTED);
return storeTransaction;
}
+
+ @Override
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config, LogSubject logSubject)
+ throws Exception
+ {
+ }
+
+ @Override
+ public void createExchange(Exchange exchange)
+ throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void removeExchange(Exchange exchange)
+ throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void bindQueue(Exchange exchange, AMQShortString routingKey,
+ AMQQueue queue, FieldTable args) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void unbindQueue(Exchange exchange,
+ AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createQueue(AMQQueue queue, FieldTable arguments)
+ throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void removeQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void updateQueue(AMQQueue queue) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createBrokerLink(BrokerLink link)
+ throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void deleteBrokerLink(BrokerLink link)
+ throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void createBridge(Bridge bridge) throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void deleteBridge(Bridge bridge) throws AMQStoreException
+ {
+ }
};
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index af742532e2..cccf02c9f3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -38,7 +38,6 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -111,11 +110,6 @@ public class MockVirtualHost implements VirtualHost
return null;
}
- public DurableConfigurationStore getDurableConfigurationStore()
- {
- return null;
- }
-
public ExchangeFactory getExchangeFactory()
{
return null;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 8ffc09930e..c3b006f371 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -38,7 +38,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
-public class SlowMessageStore implements MessageStore, DurableConfigurationStore
+public class SlowMessageStore implements MessageStore
{
private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
private static final String DELAYS = "delays";
@@ -160,11 +160,11 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- Configuration config,
- LogSubject logSubject) throws Exception
+ MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config, LogSubject logSubject) throws Exception
{
- _realStore.configureMessageStore(name, recoveryHandler, config, logSubject);
+ _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config, logSubject);
}
public void close() throws Exception
@@ -227,14 +227,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("removeQueue");
}
- public void configureTransactionLog(String name,
- TransactionLogRecoveryHandler recoveryHandler,
- Configuration storeConfiguration, LogSubject logSubject)
- throws Exception
- {
- _realStore.configureTransactionLog(name, recoveryHandler, storeConfiguration, logSubject);
- }
-
public Transaction newTransaction()
{
doPreDelay("beginTran");