summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-13 17:22:17 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-13 17:22:17 +0000
commit93ce4ab1ba3922db54f031faa8dfc1b62d8bfae1 (patch)
tree667db6ad79b604bbdfc6aa4e342da98bc130dd2b
parent53af5c535c9bc90d803393009ac1bf01efd38eac (diff)
downloadqpid-python-93ce4ab1ba3922db54f031faa8dfc1b62d8bfae1.tar.gz
QPID-1631 : Convert MessageStore to TransactionLog and RoutingTable. Updated all references and provided a test to ensure that whilst we are transitioning the configuration we can use the old MessageStore classes that now implement both interfaces without any config updates. Updates to the configuration can come when the store are renamed *TransactionLog
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@744184 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java123
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java)78
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java91
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java17
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java18
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java16
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java52
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java61
36 files changed, 492 insertions, 333 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index fc6057afd2..b3c843ebaa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -42,12 +42,8 @@ import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import org.apache.commons.configuration.Configuration;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -59,8 +55,9 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
/**
* This MBean implements the broker management interface and exposes the
@@ -72,7 +69,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
private final ExchangeFactory _exchangeFactory;
- private final MessageStore _messageStore;
+ private final TransactionLog _tranasctionLog;
+ private final RoutingTable _routingTable;
private final VirtualHost.VirtualHostMBean _virtualHostMBean;
@@ -86,8 +84,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
_queueRegistry = virtualHost.getQueueRegistry();
_exchangeRegistry = virtualHost.getExchangeRegistry();
- _messageStore = virtualHost.getMessageStore();
+ _tranasctionLog = virtualHost.getTransactionLog();
_exchangeFactory = virtualHost.getExchangeFactory();
+ _routingTable = virtualHost.getRoutingTable();
}
public String getObjectInstanceName()
@@ -180,7 +179,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
null);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _messageStore.createQueue(queue);
+ _routingTable.createQueue(queue);
}
_queueRegistry.registerQueue(queue);
@@ -215,8 +214,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
try
{
queue.delete();
- _messageStore.removeQueue(queue);
-
+ _routingTable.removeQueue(queue);
}
catch (AMQException ex)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 341aae2875..2d0589c223 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -44,11 +44,11 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.Collection;
import java.util.LinkedList;
@@ -91,7 +91,7 @@ public class AMQChannel
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
- private final MessageStore _messageStore;
+ private final TransactionLog _transactionLog;
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
@@ -109,9 +109,9 @@ public class AMQChannel
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- private boolean _closing;
+ private boolean _closing;
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ public AMQChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog)
throws AMQException
{
//Set values from configuration
@@ -122,10 +122,10 @@ public class AMQChannel
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
- _messageStore = messageStore;
+ _transactionLog = transactionLog;
// by default the session is non-transactional
- _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
}
/** Sets this channel to be part of a local transaction */
@@ -150,7 +150,7 @@ public class AMQChannel
public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
{
- _currentMessage = new IncomingMessage(info, _txnContext, _session, _messageStore);
+ _currentMessage = new IncomingMessage(info, _txnContext, _session, _transactionLog);
_currentMessage.setExchange(e);
}
@@ -174,7 +174,7 @@ public class AMQChannel
routeCurrentMessage();
- _currentMessage.routingComplete(_messageStore);
+ _currentMessage.routingComplete(_transactionLog);
deliverCurrentMessageIfComplete();
@@ -474,7 +474,7 @@ public class AMQChannel
{
deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
}
else
{
@@ -534,7 +534,7 @@ public class AMQChannel
{
deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
}
else
@@ -669,7 +669,7 @@ public class AMQChannel
{
deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
}
else
{
@@ -870,9 +870,9 @@ public class AMQChannel
return _returnMessages;
}
- public MessageStore getMessageStore()
+ public TransactionLog getTransactionLog()
{
- return _messageStore;
+ return _transactionLog;
}
private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 705e84752b..02fb57c730 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -38,8 +38,9 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
public class VirtualHostConfiguration
{
@@ -101,8 +102,6 @@ public class VirtualHostConfiguration
exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString));
exchangeConfiguration.addConfiguration(configuration.subset("exchanges"));
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore messageStore = virtualHost.getMessageStore();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
@@ -159,7 +158,7 @@ public class VirtualHostConfiguration
queueConfiguration.addConfiguration(configuration.subset("queues"));
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore messageStore = virtualHost.getMessageStore();
+ RoutingTable routingTable = virtualHost.getRoutingTable();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
@@ -205,7 +204,7 @@ public class VirtualHostConfiguration
if (queue.isDurable())
{
- messageStore.createQueue(queue);
+ routingTable.createQueue(queue);
}
queueRegistry.registerQueue(queue);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index f28ba0d0de..3e930364df 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -25,8 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.routing.RoutingTable;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
@@ -56,9 +56,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
}
- public MessageStore getMessageStore()
+ public RoutingTable getRoutingTable()
{
- return _host.getMessageStore();
+ return _host.getRoutingTable();
}
public void registerExchange(Exchange exchange) throws AMQException
@@ -66,7 +66,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
_exchangeMap.put(exchange.getName(), exchange);
if (exchange.isDurable())
{
- getMessageStore().createExchange(exchange);
+ getRoutingTable().createExchange(exchange);
}
}
@@ -93,7 +93,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
{
if (e.isDurable())
{
- getMessageStore().removeExchange(e);
+ getRoutingTable().removeExchange(e);
}
e.close();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 054674aed4..0a7d5cfe6d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,7 +54,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
+ final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getTransactionLog()
);
session.addChannel(channel);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 71f38cb28a..b1e02aef7a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -30,20 +30,17 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.routing.RoutingTable;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -73,7 +70,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore store = virtualHost.getMessageStore();
+ RoutingTable routingTable = virtualHost.getRoutingTable();
if (!body.getPassive())
@@ -120,7 +117,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queue = createQueue(queueName, body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- store.createQueue(queue, body.getArguments());
+ routingTable.createQueue(queue, body.getArguments());
}
queueRegistry.registerQueue(queue);
if (autoRegister)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index b397db9246..85a697f3b7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -30,10 +30,9 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.server.routing.RoutingTable;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -62,7 +61,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore store = virtualHost.getMessageStore();
+ RoutingTable routingTable = virtualHost.getRoutingTable();
AMQQueue queue;
if (body.getQueue() == null)
@@ -115,7 +114,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
if (queue.isDurable())
{
- store.removeQueue(queue);
+ routingTable.removeQueue(queue);
}
MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 20926b99d7..94580a00ac 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -27,10 +27,10 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -67,7 +67,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
private ArrayList<AMQQueue> _destinationQueues;
private AMQProtocolSession _publisher;
- private MessageStore _messageStore;
+ private TransactionLog _messageStore;
private long _expiration;
private Exchange _exchange;
@@ -76,7 +76,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
public IncomingMessage(final MessagePublishInfo info,
final TransactionalContext txnContext,
final AMQProtocolSession publisher,
- MessageStore messasgeStore)
+ TransactionLog messasgeStore)
{
_messagePublishInfo = info;
_txnContext = txnContext;
@@ -121,7 +121,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
- public void routingComplete(final MessageStore store) throws AMQException
+ public void routingComplete(final TransactionLog transactionLog) throws AMQException
{
if (isPersistent())
@@ -134,7 +134,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
{
for (int i = 0; i < _destinationQueues.size(); i++)
{
- store.enqueueMessage(_txnContext.getStoreContext(),
+ transactionLog.enqueueMessage(_txnContext.getStoreContext(),
_destinationQueues.get(i), getMessageId());
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index 2f6e05963c..e5e0b6e312 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,27 +58,27 @@ public class MessageFactory
/**
* Normal message creation path
- * @param store
+ * @param transactionLog
* @param persistent
* @return
*/
- public AMQMessage createMessage(MessageStore store, boolean persistent)
+ public AMQMessage createMessage(TransactionLog transactionLog, boolean persistent)
{
if (_state != State.OPEN)
{
_state = State.OPEN;
}
- return createNextMessage(_messageId.incrementAndGet(), store, persistent);
+ return createNextMessage(_messageId.incrementAndGet(), transactionLog, persistent);
}
/**
* Used for message recovery only and so only creates persistent messages.
* @param messageId the id that this message must have
- * @param store
+ * @param transactionLog
* @return
*/
- public AMQMessage createMessage(Long messageId, MessageStore store)
+ public AMQMessage createMessage(Long messageId, TransactionLog transactionLog)
{
if (_state != State.RECOVER)
{
@@ -96,14 +96,14 @@ public class MessageFactory
_messageId.set(messageId);
}
- return createNextMessage(messageId, store, true);
+ return createNextMessage(messageId, transactionLog, true);
}
- private AMQMessage createNextMessage(Long messageId, MessageStore store, boolean persistent)
+ private AMQMessage createNextMessage(Long messageId, TransactionLog transactionLog, boolean persistent)
{
if (persistent)
{
- return new PersistentAMQMessage(messageId, store);
+ return new PersistentAMQMessage(messageId, transactionLog);
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
index 04e3635f92..804bb29ecd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -24,17 +24,17 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
public class PersistentAMQMessage extends TransientAMQMessage
{
- protected MessageStore _messageStore;
+ protected TransactionLog _transactionLog;
- public PersistentAMQMessage(Long messageId, MessageStore store)
+ public PersistentAMQMessage(Long messageId, TransactionLog transactionLog)
{
super(messageId);
- _messageStore = store;
+ _transactionLog = transactionLog;
}
@Override
@@ -42,7 +42,7 @@ public class PersistentAMQMessage extends TransientAMQMessage
throws AMQException
{
super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody);
- _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
+ _transactionLog.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
contentChunk, isLastContentBody);
}
@@ -54,13 +54,13 @@ public class PersistentAMQMessage extends TransientAMQMessage
super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
- _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+ _transactionLog.storeMessageMetaData(storeContext, _messageId, mmd);
}
@Override
public void removeMessage(StoreContext storeContext) throws AMQException
{
- _messageStore.removeMessage(storeContext, _messageId);
+ _transactionLog.removeMessage(storeContext, _messageId);
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 547df7856d..a08e4e2667 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -6,11 +6,11 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.AMQException;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
@@ -210,7 +210,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+ _virtualHost.getRoutingTable().bindQueue(exchange, routingKey, this, arguments);
}
_bindings.addBinding(routingKey, arguments, exchange);
@@ -221,7 +221,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
exchange.deregisterQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+ _virtualHost.getRoutingTable().unbindQueue(exchange, routingKey, this, arguments);
}
boolean removed = _bindings.remove(routingKey, arguments, exchange);
@@ -581,7 +581,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
AMQMessage msg = entry.getMessage();
if (msg.isPersistent())
{
- _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
+ _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
}
//entry.dispose(storeContext);
@@ -826,7 +826,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- MessageStore store = getVirtualHost().getMessageStore();
+ TransactionLog transactionLog = getVirtualHost().getTransactionLog();
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -847,16 +847,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- store.beginTran(storeContext);
+ transactionLog.beginTran(storeContext);
- // Move the messages in on the message store.
+ // Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
AMQMessage message = entry.getMessage();
if (message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
// dequeue does not decrement the refence count
entry.dequeue(storeContext);
@@ -865,18 +865,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Commit and flush the move transcations.
try
{
- store.commitTran(storeContext);
+ transactionLog.commitTran(storeContext);
}
catch (AMQException e)
{
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
}
}
catch (AMQException e)
{
try
{
- store.abortTran(storeContext);
+ transactionLog.abortTran(storeContext);
}
catch (AMQException rollbackEx)
{
@@ -910,7 +910,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
final StoreContext storeContext)
{
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- MessageStore store = getVirtualHost().getMessageStore();
+ TransactionLog transactionLog = getVirtualHost().getTransactionLog();
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -938,34 +938,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- store.beginTran(storeContext);
+ transactionLog.beginTran(storeContext);
- // Move the messages in on the message store.
+ // Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
AMQMessage message = entry.getMessage();
if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
}
// Commit and flush the move transcations.
try
{
- store.commitTran(storeContext);
+ transactionLog.commitTran(storeContext);
}
catch (AMQException e)
{
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
}
}
catch (AMQException e)
{
try
{
- store.abortTran(storeContext);
+ transactionLog.abortTran(storeContext);
}
catch (AMQException rollbackEx)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
new file mode 100644
index 0000000000..6344127b24
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.routing;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.Configuration;
+
+public interface RoutingTable
+{
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param virtualHost The virtual host using by this store
+ * @param base The base element identifier from which all configuration items are relative. For example, if
+ * the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object.
+ *
+ * @throws Exception If any error occurs that means the store is unable to configure itself.
+ */
+ void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception;
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws Exception If the close fails.
+ */
+ void close() throws Exception;
+
+
+ /**
+ * Makes the specified exchange persistent.
+ *
+ * @param exchange The exchange to persist.
+ *
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ void createExchange(Exchange exchange) throws AMQException;
+
+ /**
+ * Removes the specified persistent exchange.
+ *
+ * @param exchange The exchange to remove.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void removeExchange(Exchange exchange) throws AMQException;
+
+ /**
+ * Binds the specified queue to an exchange with a routing key.
+ *
+ * @param exchange The exchange to bind to.
+ * @param routingKey The routing key to bind by.
+ * @param queue The queue to bind.
+ * @param args Additional parameters.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+ /**
+ * Unbinds the specified from an exchange under a particular routing key.
+ *
+ * @param exchange The exchange to unbind from.
+ * @param routingKey The routing key to unbind.
+ * @param queue The queue to unbind.
+ * @param args Additonal parameters.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * @param queue The queue to store.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void createQueue(AMQQueue queue) throws AMQException;
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * @param queue The queue to store.
+ * @param arguments The additional arguments to the binding
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
+
+ /**
+ * Removes the specified queue from the persistent store.
+ *
+ * @param queue The queue to remove.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void removeQueue(final AMQQueue queue) throws AMQException;
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 425aed43d4..500fd4c7bf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -32,6 +32,8 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -62,7 +64,7 @@ import java.util.HashMap;
import java.util.TreeMap;
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore implements TransactionLog, RoutingTable
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -142,37 +144,40 @@ public class DerbyMessageStore implements MessageStore
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
- stateTransition(State.INITIAL, State.CONFIGURING);
+ //Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config
+ if (base.equals("store"))
+ {
+ stateTransition(State.INITIAL, State.CONFIGURING);
- initialiseDriver();
+ initialiseDriver();
- _virtualHost = virtualHost;
+ _virtualHost = virtualHost;
- _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
+ final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
{
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
}
- }
-
- createOrOpenDatabase(databasePath);
- // this recovers durable queues and persistent messages
+ createOrOpenDatabase(databasePath);
- _messageFactory = MessageFactory.getInstance();
+ // this recovers durable queues and persistent messages
- recover();
+ _messageFactory = MessageFactory.getInstance();
- stateTransition(State.RECOVERING, State.STARTED);
+ recover();
+ stateTransition(State.RECOVERING, State.STARTED);
+ }
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 587c85fc12..eee7be7ef6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -30,6 +30,8 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import java.util.ArrayList;
import java.util.Collections;
@@ -39,8 +41,14 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+/** A simple message store that stores the messages in a threadsafe structure in memory.
+ *
+ * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog
+ *
+ * This class really should have no storage unless we want to do inMemory Recovery.
+ *
+ */
+public class MemoryMessageStore implements TransactionLog, RoutingTable
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -64,10 +72,14 @@ public class MemoryMessageStore implements MessageStore
public void configure(String base, Configuration config)
{
- int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash tables");
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
+ //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
+ if (base.equals("store"))
+ {
+ int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
+ _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
+ }
}
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
index e65dded149..c927bb3272 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
+package org.apache.qpid.server.transactionlog;
import org.apache.commons.configuration.Configuration;
@@ -30,9 +30,11 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.StoreContext;
/**
- * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues
+ * TransactionLog defines the interface for performing transactions.
+ * This is used to preserve the state of messages, queues
* and exchanges in a transactional manner.
*
* <p/>All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which
@@ -52,7 +54,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
* <tr><td> Generate message identifiers.
* </table>
*/
-public interface MessageStore
+public interface TransactionLog
{
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
@@ -84,74 +86,7 @@ public interface MessageStore
*/
void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
- /**
- * Makes the specified exchange persistent.
- *
- * @param exchange The exchange to persist.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void createExchange(Exchange exchange) throws AMQException;
- /**
- * Removes the specified persistent exchange.
- *
- * @param exchange The exchange to remove.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void removeExchange(Exchange exchange) throws AMQException;
-
- /**
- * Binds the specified queue to an exchange with a routing key.
- *
- * @param exchange The exchange to bind to.
- * @param routingKey The routing key to bind by.
- * @param queue The queue to bind.
- * @param args Additional parameters.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
-
- /**
- * Unbinds the specified from an exchange under a particular routing key.
- *
- * @param exchange The exchange to unbind from.
- * @param routingKey The routing key to unbind.
- * @param queue The queue to unbind.
- * @param args Additonal parameters.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
-
- /**
- * Makes the specified queue persistent.
- *
- * @param queue The queue to store.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void createQueue(AMQQueue queue) throws AMQException;
-
- /**
- * Makes the specified queue persistent.
- *
- * @param queue The queue to store.
- *
- * @param arguments The additional arguments to the binding
- * @throws AMQException If the operation fails for any reason.
- */
- void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
-
- /**
- * Removes the specified queue from the persistent store.
- *
- * @param queue The queue to remove.
- * @throws AMQException If the operation fails for any reason.
- */
- void removeQueue(final AMQQueue queue) throws AMQException;
/**
* Places a message onto a specified queue, in a given transactional context.
@@ -209,6 +144,7 @@ public interface MessageStore
*/
boolean inTran(StoreContext context);
+
/**
* Stores a chunk of message data.
*
@@ -261,7 +197,7 @@ public interface MessageStore
/**
* Is this store capable of persisting the data
- *
+ *
* @return true if this store is capable of persisting data
*/
boolean isPersistent();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 3c71282c57..9bc2e98fe9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -25,11 +25,11 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import java.util.List;
@@ -56,7 +56,6 @@ public class LocalTransactionalContext implements TransactionalContext
private boolean _messageDelivered = false;
private final AMQChannel _channel;
-
private abstract class DeliveryAction
{
@@ -125,9 +124,9 @@ public class LocalTransactionalContext implements TransactionalContext
return _channel.getReturnMessages();
}
- public MessageStore getMessageStore()
+ public TransactionLog getTransactionLog()
{
- return _channel.getMessageStore();
+ return _channel.getTransactionLog();
}
@@ -135,9 +134,9 @@ public class LocalTransactionalContext implements TransactionalContext
{
_txnBuffer.rollback(getStoreContext());
// Hack to deal with uncommitted non-transactional writes
- if (getMessageStore().inTran(getStoreContext()))
+ if (getTransactionLog().inTran(getStoreContext()))
{
- getMessageStore().abortTran(getStoreContext());
+ getTransactionLog().abortTran(getStoreContext());
_inTran = false;
}
@@ -224,7 +223,7 @@ public class LocalTransactionalContext implements TransactionalContext
_log.debug("Starting transaction on message store: " + this);
}
- getMessageStore().beginTran(getStoreContext());
+ getTransactionLog().beginTran(getStoreContext());
_inTran = true;
}
}
@@ -247,7 +246,7 @@ public class LocalTransactionalContext implements TransactionalContext
if (_messageDelivered && _inTran)
{
- _txnBuffer.enlist(new StoreMessageOperation(getMessageStore()));
+ _txnBuffer.enlist(new StoreMessageOperation(getTransactionLog()));
}
// fixme fail commit here ... QPID-440
try
@@ -257,7 +256,7 @@ public class LocalTransactionalContext implements TransactionalContext
finally
{
_messageDelivered = false;
- _inTran = getMessageStore().inTran(getStoreContext());
+ _inTran = getTransactionLog().inTran(getStoreContext());
}
try
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index acb2e93818..145d7f8b13 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -26,10 +26,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
/** @author Apache Software Foundation */
@@ -45,20 +45,20 @@ public class NonTransactionalContext implements TransactionalContext
- private final MessageStore _messageStore;
+ private final TransactionLog _transactionLog;
private final StoreContext _storeContext;
/** Whether we are in a transaction */
private boolean _inTran;
- public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
+ public NonTransactionalContext(TransactionLog transactionLog, StoreContext storeContext, AMQChannel channel,
List<RequiredDeliveryException> returnMessages)
{
_channel = channel;
_storeContext = storeContext;
_returnMessages = returnMessages;
- _messageStore = messageStore;
+ _transactionLog = transactionLog;
}
@@ -72,7 +72,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (!_inTran)
{
- _messageStore.beginTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
_inTran = true;
}
}
@@ -191,7 +191,7 @@ public class NonTransactionalContext implements TransactionalContext
}
if(_inTran)
{
- _messageStore.commitTran(_storeContext);
+ _transactionLog.commitTran(_storeContext);
_inTran = false;
}
}
@@ -200,7 +200,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (persistent)
{
- _messageStore.commitTran(_storeContext);
+ _transactionLog.commitTran(_storeContext);
_inTran = false;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
index 0e4d6c2030..f1fbca2e28 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
/**
* A transactional operation to store messages in an underlying persistent store. When this operation
@@ -31,11 +31,11 @@ import org.apache.qpid.server.store.StoreContext;
*/
public class StoreMessageOperation implements TxnOp
{
- private final MessageStore _messsageStore;
+ private final TransactionLog _transactionLog;
- public StoreMessageOperation(MessageStore messageStore)
+ public StoreMessageOperation(TransactionLog transactionLog)
{
- _messsageStore = messageStore;
+ _transactionLog = transactionLog;
}
public void prepare(StoreContext context) throws AMQException
@@ -48,11 +48,11 @@ public class StoreMessageOperation implements TxnOp
public void commit(StoreContext context) throws AMQException
{
- _messsageStore.commitTran(context);
+ _transactionLog.commitTran(context);
}
public void rollback(StoreContext context) throws AMQException
{
- _messsageStore.abortTran(context);
+ _transactionLog.abortTran(context);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index de4c8ac1ff..1497b4adb8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -30,6 +30,8 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -45,10 +47,8 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.access.plugins.SimpleXML;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.store.MessageStore;
public class VirtualHost implements Accessable
{
@@ -65,7 +65,9 @@ public class VirtualHost implements Accessable
private ExchangeFactory _exchangeFactory;
- private MessageStore _messageStore;
+ private TransactionLog _transactionLog;
+
+ private RoutingTable _routingTable;
protected VirtualHostMBean _virtualHostMBean;
@@ -96,6 +98,11 @@ public class VirtualHost implements Accessable
return _connectionRegistry;
}
+ public RoutingTable getRoutingTable()
+ {
+ return _routingTable;
+ }
+
/**
* Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
* implementaion of an Exchange MBean should extend this class.
@@ -128,12 +135,12 @@ public class VirtualHost implements Accessable
/**
* Used for testing only
* @param name
- * @param store
+ * @param transactionLog
* @throws Exception
*/
- public VirtualHost(String name, MessageStore store) throws Exception
+ public VirtualHost(String name, TransactionLog transactionLog) throws Exception
{
- this(name, new PropertiesConfiguration(), store);
+ this(name, new PropertiesConfiguration(), transactionLog);
}
/**
@@ -147,7 +154,7 @@ public class VirtualHost implements Accessable
this(name, hostConfig, null);
}
- public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+ public VirtualHost(String name, Configuration hostConfig, TransactionLog transactionLog) throws Exception
{
if (name == null || name.length() == 0)
{
@@ -166,19 +173,26 @@ public class VirtualHost implements Accessable
_exchangeFactory.initialise(hostConfig);
_exchangeRegistry = new DefaultExchangeRegistry(this);
- if (store != null)
+ if (transactionLog != null)
{
- _messageStore = store;
+ _transactionLog = transactionLog;
+ if (_transactionLog instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable) _transactionLog;
+ }
}
else
{
if (hostConfig == null)
{
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ throw new IllegalAccessException("HostConfig and TransactionLog cannot be null");
}
- initialiseMessageStore(hostConfig);
+ initialiseTransactionLog(hostConfig);
+ initialiseRoutingTable(hostConfig);
}
+
+
_exchangeRegistry.initialise();
_authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
@@ -224,23 +238,52 @@ public class VirtualHost implements Accessable
period);
}
}
-
- private void initialiseMessageStore(Configuration config) throws Exception
+
+ //todo we need to move from store.class to transactionlog.class
+ private void initialiseTransactionLog(Configuration config) throws Exception
{
- String messageStoreClass = config.getString("store.class");
+ String transactionLogClass = config.getString("store.class");
- Class clazz = Class.forName(messageStoreClass);
+ Class clazz = Class.forName(transactionLogClass);
Object o = clazz.newInstance();
- if (!(o instanceof MessageStore))
+ if (!(o instanceof TransactionLog))
{
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
" does not.");
}
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", config);
+ _transactionLog = (TransactionLog) o;
+ _transactionLog.configure(this, "store", config);
}
+ //todo we need to move from store.class to transactionlog.class
+ private void initialiseRoutingTable(Configuration config) throws Exception
+ {
+ String transactionLogClass = config.getString("routingtable.class");
+
+ if (transactionLogClass != null)
+ {
+ Class clazz = Class.forName(transactionLogClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof RoutingTable))
+ {
+ throw new ClassCastException("RoutingTable class must implement " + RoutingTable.class + ". Class " + clazz +
+ " does not.");
+ }
+ _routingTable = (RoutingTable) o;
+ _routingTable.configure(this, "routingtable", config);
+ }
+ else
+ {
+ if (_transactionLog instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable)_transactionLog;
+ }
+ }
+ }
+
+
public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
{
@@ -284,9 +327,9 @@ public class VirtualHost implements Accessable
throw new UnsupportedOperationException();
}
- public MessageStore getMessageStore()
+ public TransactionLog getTransactionLog()
{
- return _messageStore;
+ return _transactionLog;
}
public AuthenticationManager getAuthenticationManager()
@@ -320,10 +363,10 @@ public class VirtualHost implements Accessable
_houseKeepingTimer.cancel();
}
- //Close MessageStore
- if (_messageStore != null)
+ //Close TransactionLog
+ if (_transactionLog != null)
{
- _messageStore.close();
+ _transactionLog.close();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
index faa7b85d58..a6e060e793 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
@@ -302,7 +302,7 @@ public class MessageStoreTool
boolean warning = false;
for (VirtualHost vhost : vhosts)
{
- if (vhost.getMessageStore() instanceof MemoryMessageStore)
+ if (vhost.getTransactionLog() instanceof MemoryMessageStore)
{
_console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. "
+ "Changes will not persist.");
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 5c54c0b57f..40b08a2e39 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -31,18 +31,17 @@ import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.FailedDequeueException;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageCleanupException;
-import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
@@ -64,11 +63,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected final Set<TestQueue> queues = new HashSet<TestQueue>();
/** Not used in this test, just there to stub out the routing calls */
- private MessageStore _store = new MemoryMessageStore();
-
- private StoreContext _storeContext = new StoreContext();
-
- private MessageFactory _messageFactory = MessageFactory.getInstance();
+ private TransactionLog _transactionLog = new MemoryMessageStore();
private int count;
@@ -107,7 +102,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected void route(Message m) throws AMQException
{
exchange.route(m.getIncomingMessage());
- m.getIncomingMessage().routingComplete(_store);
+ m.getIncomingMessage().routingComplete(_transactionLog);
if (m.getIncomingMessage().allContentReceived())
{
m.getIncomingMessage().deliverToQueues();
@@ -382,9 +377,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
static class Message
{
- private static MessageStore _messageStore = new SkeletonMessageStore();
+ private static TransactionLog _transactionLog = new SkeletonMessageStore();
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(),
+ private static TransactionalContext _txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(),
null,
new LinkedList<RequiredDeliveryException>()
);
@@ -395,7 +390,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
MessagePublishInfo mpi = getPublishRequest(id);
- IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_messageStore), _messageStore);
+ IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
try
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index 396b8c5128..f8544a33bd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -27,10 +27,10 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -46,17 +46,16 @@ public class DestWildExchangeTest extends TestCase
TopicExchange _exchange;
VirtualHost _vhost;
- MessageStore _store;
+ TransactionLog _tranasctionLog;
StoreContext _context;
InternalTestProtocolSession _protocolSession;
-
public void setUp() throws AMQException
{
_exchange = new TopicExchange();
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
- _store = new MemoryMessageStore();
+ _tranasctionLog = new MemoryMessageStore();
_context = new StoreContext();
_protocolSession = new InternalTestProtocolSession();
}
@@ -75,7 +74,7 @@ public class DestWildExchangeTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl(null, false, false, new AMQShortString("a.b"));
- IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _store);
+ IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _tranasctionLog);
_exchange.route(message);
@@ -497,7 +496,7 @@ public class DestWildExchangeTest extends TestCase
throws AMQException
{
_exchange.route(message);
- message.routingComplete(_store);
+ message.routingComplete(_tranasctionLog);
message.deliverToQueues();
}
@@ -547,11 +546,11 @@ public class DestWildExchangeTest extends TestCase
{
MessagePublishInfo info = new MessagePublishInfoImpl(null, false, true, new AMQShortString(s));
- TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
+ TransactionalContext trancontext = new NonTransactionalContext(_tranasctionLog, _context, null,
new LinkedList<RequiredDeliveryException>()
);
- IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _store);
+ IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _tranasctionLog);
message.setContentHeaderBody( new ContentHeaderBody());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index d5db87350b..f6c307757b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -28,11 +28,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import javax.management.JMException;
@@ -45,7 +45,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
/** Used for debugging. */
private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
- private MessageStore _messageStore = new SkeletonMessageStore();
+ private TransactionLog _transactionLog = new SkeletonMessageStore();
private AMQMinaProtocolSession _protocolSession;
private AMQChannel _channel;
private AMQProtocolSessionMBean _mbean;
@@ -60,7 +60,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
new AMQShortString("test"),
true,
_protocolSession.getVirtualHost(), null);
- AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
+ AMQChannel channel = new AMQChannel(_protocolSession,2, _transactionLog);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
@@ -71,7 +71,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
// check APIs
- AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore);
+ AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _transactionLog);
channel3.setLocalTransactional();
_protocolSession.addChannel(channel3);
_mbean.rollbackTransactions(2);
@@ -91,14 +91,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase
}
// check if closing of session works
- _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _transactionLog));
_mbean.closeConnection();
try
{
channelCount = _mbean.channels().size();
assertTrue(channelCount == 0);
// session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _transactionLog));
fail();
}
catch (AMQException ex)
@@ -117,7 +117,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
null);
_protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
- _channel = new AMQChannel(_protocolSession, 1, _messageStore);
+ _channel = new AMQChannel(_protocolSession, 1, _transactionLog);
_protocolSession.addChannel(_channel);
_mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index be40535df9..1bc50db1d5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -32,6 +31,7 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
@@ -61,9 +61,9 @@ public class AMQQueueAlertTest extends TestCase
private AMQQueueMBean _queueMBean;
private VirtualHost _virtualHost;
private AMQMinaProtocolSession _protocolSession;
- private MessageStore _messageStore = new MemoryMessageStore();
+ private TransactionLog _transactionLog = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ private TransactionalContext _transactionalContext = new NonTransactionalContext(_transactionLog, _storeContext,
null,
new LinkedList<RequiredDeliveryException>()
);
@@ -186,7 +186,7 @@ public class AMQQueueAlertTest extends TestCase
public void testQueueDepthAlertWithSubscribers() throws Exception
{
_protocolSession = new InternalTestProtocolSession();
- AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _transactionLog);
_protocolSession.addChannel(channel);
// Create queue
@@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends TestCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = size; // in bytes
- IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore);
+ IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _transactionLog);
message.setContentHeaderBody(contentHeaderBody);
return message;
@@ -308,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
messages[i].enqueue(qs);
- messages[i].routingComplete(_messageStore);
+ messages[i].routingComplete(_transactionLog);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 97c76c8e9d..daa8e4beb7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -41,7 +42,6 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -60,7 +60,7 @@ public class AMQQueueMBeanTest extends TestCase
private static long MESSAGE_SIZE = 1000;
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
- private MessageStore _messageStore;
+ private TransactionLog _transactionLog;
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
@@ -113,7 +113,7 @@ public class AMQQueueMBeanTest extends TestCase
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getTransactionLog());
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
@@ -130,7 +130,7 @@ public class AMQQueueMBeanTest extends TestCase
InternalTestProtocolSession protocolSession = new InternalTestProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
+ AMQChannel channel = new AMQChannel(protocolSession, 1, _transactionLog);
protocolSession.addChannel(channel);
Subscription subscription =
@@ -221,7 +221,7 @@ public class AMQQueueMBeanTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore);
+ msg.routingComplete(_transactionLog);
long id = msg.getMessageId();
@@ -266,7 +266,7 @@ public class AMQQueueMBeanTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore);
+ IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _transactionLog);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -278,9 +278,9 @@ public class AMQQueueMBeanTest extends TestCase
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(1);
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = _virtualHost.getMessageStore();
+ _transactionLog = _virtualHost.getTransactionLog();
- _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ _transactionalContext = new NonTransactionalContext(_transactionLog, _storeContext,
null,
new LinkedList<RequiredDeliveryException>()
);
@@ -307,7 +307,7 @@ public class AMQQueueMBeanTest extends TestCase
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore);
+ currentMessage.routingComplete(_transactionLog);
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
index 3633481012..2a51f42e4e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
@@ -21,13 +21,13 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transactionlog.TransactionLog;
public class MockPersistentAMQMessage extends PersistentAMQMessage
{
- public MockPersistentAMQMessage(long messageId, MessageStore store)
+ public MockPersistentAMQMessage(long messageId, TransactionLog transactionLog)
throws AMQException
{
- super(messageId, store);
+ super(messageId, transactionLog);
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 99c88fac3e..0cc33bf102 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -24,11 +24,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.transport.Sender;
import javax.security.sasl.SaslServer;
@@ -41,13 +41,11 @@ import java.security.Principal;
*/
public class MockProtocolSession implements AMQProtocolSession
{
- private MessageStore _messageStore;
private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
- public MockProtocolSession(MessageStore messageStore)
+ public MockProtocolSession(TransactionLog transactionLog)
{
- _messageStore = messageStore;
}
public void dataBlockReceived(AMQDataBlock message) throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
index 1e47f764df..bcbd83cde3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
@@ -73,6 +73,7 @@ public class PrincipalPermissionsTest extends TestCase
}
catch (Exception e)
{
+ e.printStackTrace();
fail(e.getMessage());
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 36f640a325..7722eae116 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -39,6 +38,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -151,7 +151,7 @@ public class MessageStoreTest extends TestCase
//Load the Virtualhost with the required MessageStore
reload(configuration);
- MessageStore messageStore = _virtualHost.getMessageStore();
+ TransactionLog transactionLog = _virtualHost.getTransactionLog();
createAllQueues();
createAllTopicQueues();
@@ -190,9 +190,9 @@ public class MessageStoreTest extends TestCase
assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size());
- if (!messageStore.isPersistent())
+ if (!transactionLog.isPersistent())
{
- _logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence.");
+ _logger.warn("Unable to test Persistent capabilities of messages store(" + transactionLog.getClass() + ") as it is not capable of peristence.");
return;
}
@@ -348,10 +348,10 @@ public class MessageStoreTest extends TestCase
try
{
currentMessage = new IncomingMessage(messageInfo,
- new NonTransactionalContext(_virtualHost.getMessageStore(),
+ new NonTransactionalContext(_virtualHost.getTransactionLog(),
new StoreContext(), null, null),
new InternalTestProtocolSession(),
- _virtualHost.getMessageStore());
+ _virtualHost.getTransactionLog());
}
catch (AMQException e)
{
@@ -388,7 +388,7 @@ public class MessageStoreTest extends TestCase
try
{
- currentMessage.routingComplete(_virtualHost.getMessageStore());
+ currentMessage.routingComplete(_virtualHost.getTransactionLog());
}
catch (AMQException e)
{
@@ -486,7 +486,7 @@ public class MessageStoreTest extends TestCase
if (queue.isDurable() && !queue.isAutoDelete())
{
- _virtualHost.getMessageStore().createQueue(queue, queueArguments);
+ _virtualHost.getRoutingTable().createQueue(queue, queueArguments);
}
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index f08a15a8a7..a5b65b527c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -29,6 +29,8 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,7 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
* A message store that does nothing. Designed to be used in tests that do not want to use any message store
* functionality.
*/
-public class SkeletonMessageStore implements MessageStore
+public class SkeletonMessageStore implements TransactionLog , RoutingTable
{
private final AtomicLong _messageId = new AtomicLong(1);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
index 84d3d313d1..ca6644d141 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.txn;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.LinkedList;
import java.util.NoSuchElementException;
@@ -283,13 +283,13 @@ public class TxnBufferTest extends TestCase
class TxnTester extends NullOp
{
- private final MessageStore store;
+ private final TransactionLog store;
private final StoreContext context = new StoreContext();
- TxnTester(MessageStore store)
+ TxnTester(TransactionLog transactionLog)
{
- this.store = store;
+ this.store = transactionLog;
}
public void prepare() throws AMQException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 9e24f13127..d150faf94a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -30,8 +30,8 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.framing.AMQShortString;
@@ -47,7 +47,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
public class InternalBrokerBaseCase extends TestCase
{
protected IApplicationRegistry _registry;
- protected MessageStore _messageStore;
+ protected TransactionLog _transactionLog;
protected MockChannel _channel;
protected InternalTestProtocolSession _session;
protected VirtualHost _virtualHost;
@@ -62,7 +62,7 @@ public class InternalBrokerBaseCase extends TestCase
ApplicationRegistry.initialise(_registry);
_virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = _virtualHost.getMessageStore();
+ _transactionLog = _virtualHost.getTransactionLog();
QUEUE_NAME = new AMQShortString("test");
_queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"),
@@ -78,7 +78,7 @@ public class InternalBrokerBaseCase extends TestCase
_session.setVirtualHost(_virtualHost);
- _channel = new MockChannel(_session, 1, _messageStore);
+ _channel = new MockChannel(_session, 1, _transactionLog);
_session.addChannel(_channel);
}
@@ -91,7 +91,7 @@ public class InternalBrokerBaseCase extends TestCase
protected void checkStoreContents(int messageCount)
{
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size());
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _transactionLog).getMessageMetaDataMap().size());
//The above publish message is sufficiently small not to fit in the header so no Body is required.
//assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index b6d42e6068..2605ed0d11 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -27,14 +27,13 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
-import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.Collection;
import java.util.HashMap;
@@ -49,11 +48,10 @@ public class TestApplicationRegistry extends ApplicationRegistry
private ExchangeFactory _exchangeFactory;
- private MessageStore _messageStore;
+ private TransactionLog _transactionLog;
private VirtualHost _vHost;
-
public TestApplicationRegistry()
{
super(new MapConfiguration(new HashMap()));
@@ -73,11 +71,11 @@ public class TestApplicationRegistry extends ApplicationRegistry
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _messageStore = new TestableMemoryMessageStore();
+ _transactionLog = new TestableMemoryMessageStore();
_virtualHostRegistry = new VirtualHostRegistry();
- _vHost = new VirtualHost("test", _messageStore);
+ _vHost = new VirtualHost("test", _transactionLog);
_virtualHostRegistry.registerVirtualHost(_vHost);
@@ -114,9 +112,9 @@ public class TestApplicationRegistry extends ApplicationRegistry
_accessManager = newManager;
}
- public MessageStore getMessageStore()
+ public TransactionLog getTransactionLog()
{
- return _messageStore;
+ return _transactionLog;
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
new file mode 100644
index 0000000000..ba19fd5d5e
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+public class VirtualhostInitRoutingTableFromTransactionLogTest extends TestCase
+{
+ public void test()
+ {
+ PropertiesConfiguration env = new PropertiesConfiguration();
+
+
+ env.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
+
+ VirtualHost _virtualHost = null;
+ try
+ {
+ _virtualHost = new VirtualHost("test", env);
+
+ assertNotNull(_virtualHost.getTransactionLog());
+ assertNotNull(_virtualHost.getRoutingTable());
+ assertEquals(_virtualHost.getTransactionLog(),_virtualHost.getRoutingTable());
+ }
+ catch (Exception e)
+ {
+ fail(e.getMessage());
+ }
+
+
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
index 447d09429d..77767463ea 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
@@ -21,18 +21,18 @@
package org.apache.qpid.util;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
public class MockChannel extends AMQChannel
{
- public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ public MockChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog)
throws AMQException
{
- super(session, channelId, messageStore);
+ super(session, channelId, transactionLog);
}
public Subscription getSubscription(AMQShortString subscription)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index dd71b2d0da..256491194d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -30,18 +30,21 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import java.util.HashMap;
import java.util.Iterator;
-public class SlowMessageStore implements MessageStore
+public class SlowMessageStore implements TransactionLog, RoutingTable
{
private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
private static final String DELAYS = "delays";
private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
private long _defaultDelay = 0L;
- private MessageStore _realStore = new MemoryMessageStore();
+ private TransactionLog _realTransactionLog = new MemoryMessageStore();
+ private RoutingTable _realRoutingTable = (RoutingTable)_realTransactionLog;
private static final String PRE = "pre";
private static final String POST = "post";
private String DEFAULT_DELAY = "default";
@@ -53,30 +56,30 @@ public class SlowMessageStore implements MessageStore
configureDelays(delays);
- String messageStoreClass = config.getString(base + ".store.class");
+ String transactionLogClass = config.getString(base + ".store.class");
if (delays.containsKey(DEFAULT_DELAY))
{
_defaultDelay = delays.getLong(DEFAULT_DELAY);
}
- if (messageStoreClass != null)
+ if (transactionLogClass != null)
{
- Class clazz = Class.forName(messageStoreClass);
+ Class clazz = Class.forName(transactionLogClass);
Object o = clazz.newInstance();
- if (!(o instanceof MessageStore))
+ if (!(o instanceof TransactionLog))
{
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
" does not.");
}
- _realStore = (MessageStore) o;
- _realStore.configure(virtualHost, base + ".store", config);
+ _realTransactionLog = (TransactionLog) o;
+ _realTransactionLog.configure(virtualHost, base , config);
}
else
{
- _realStore.configure(virtualHost, base + ".store", config);
+ _realTransactionLog.configure(virtualHost, base , config);
}
}
@@ -150,42 +153,42 @@ public class SlowMessageStore implements MessageStore
public void close() throws Exception
{
doPreDelay("close");
- _realStore.close();
+ _realTransactionLog.close();
doPostDelay("close");
}
public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
{
doPreDelay("removeMessage");
- _realStore.removeMessage(storeContext, messageId);
+ _realTransactionLog.removeMessage(storeContext, messageId);
doPostDelay("removeMessage");
}
public void createExchange(Exchange exchange) throws AMQException
{
doPreDelay("createExchange");
- _realStore.createExchange(exchange);
+ _realRoutingTable.createExchange(exchange);
doPostDelay("createExchange");
}
public void removeExchange(Exchange exchange) throws AMQException
{
doPreDelay("removeExchange");
- _realStore.removeExchange(exchange);
+ _realRoutingTable.removeExchange(exchange);
doPostDelay("removeExchange");
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
doPreDelay("bindQueue");
- _realStore.bindQueue(exchange, routingKey, queue, args);
+ _realRoutingTable.bindQueue(exchange, routingKey, queue, args);
doPostDelay("bindQueue");
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
doPreDelay("unbindQueue");
- _realStore.unbindQueue(exchange, routingKey, queue, args);
+ _realRoutingTable.unbindQueue(exchange, routingKey, queue, args);
doPostDelay("unbindQueue");
}
@@ -197,56 +200,56 @@ public class SlowMessageStore implements MessageStore
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
doPreDelay("createQueue");
- _realStore.createQueue(queue, arguments);
+ _realRoutingTable.createQueue(queue, arguments);
doPostDelay("createQueue");
}
public void removeQueue(AMQQueue queue) throws AMQException
{
doPreDelay("removeQueue");
- _realStore.removeQueue(queue);
+ _realRoutingTable.removeQueue(queue);
doPostDelay("removeQueue");
}
public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
doPreDelay("enqueueMessage");
- _realStore.enqueueMessage(context, queue, messageId);
+ _realTransactionLog.enqueueMessage(context, queue, messageId);
doPostDelay("enqueueMessage");
}
public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
doPreDelay("dequeueMessage");
- _realStore.dequeueMessage(context, queue, messageId);
+ _realTransactionLog.dequeueMessage(context, queue, messageId);
doPostDelay("dequeueMessage");
}
public void beginTran(StoreContext context) throws AMQException
{
doPreDelay("beginTran");
- _realStore.beginTran(context);
+ _realTransactionLog.beginTran(context);
doPostDelay("beginTran");
}
public void commitTran(StoreContext context) throws AMQException
{
doPreDelay("commitTran");
- _realStore.commitTran(context);
+ _realTransactionLog.commitTran(context);
doPostDelay("commitTran");
}
public void abortTran(StoreContext context) throws AMQException
{
doPreDelay("abortTran");
- _realStore.abortTran(context);
+ _realTransactionLog.abortTran(context);
doPostDelay("abortTran");
}
public boolean inTran(StoreContext context)
{
doPreDelay("inTran");
- boolean b = _realStore.inTran(context);
+ boolean b = _realTransactionLog.inTran(context);
doPostDelay("inTran");
return b;
}
@@ -254,21 +257,21 @@ public class SlowMessageStore implements MessageStore
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
{
doPreDelay("storeContentBodyChunk");
- _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+ _realTransactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
doPostDelay("storeContentBodyChunk");
}
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
{
doPreDelay("storeMessageMetaData");
- _realStore.storeMessageMetaData(context, messageId, messageMetaData);
+ _realTransactionLog.storeMessageMetaData(context, messageId, messageMetaData);
doPostDelay("storeMessageMetaData");
}
public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
doPreDelay("getMessageMetaData");
- MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId);
+ MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId);
doPostDelay("getMessageMetaData");
return mmd;
}
@@ -276,14 +279,14 @@ public class SlowMessageStore implements MessageStore
public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
doPreDelay("getContentBodyChunk");
- ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index);
+ ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index);
doPostDelay("getContentBodyChunk");
return c;
}
public boolean isPersistent()
{
- return _realStore.isPersistent();
+ return _realTransactionLog.isPersistent();
}
}