summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java123
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java (renamed from java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java)78
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java91
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java15
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java14
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java12
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java18
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java1
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java16
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java10
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java14
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java52
-rw-r--r--java/broker/src/test/java/org/apache/qpid/util/MockChannel.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java61
36 files changed, 492 insertions, 333 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index fc6057afd2..b3c843ebaa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -42,12 +42,8 @@ import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import org.apache.commons.configuration.Configuration;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -59,8 +55,9 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
/**
* This MBean implements the broker management interface and exposes the
@@ -72,7 +69,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
private final ExchangeFactory _exchangeFactory;
- private final MessageStore _messageStore;
+ private final TransactionLog _tranasctionLog;
+ private final RoutingTable _routingTable;
private final VirtualHost.VirtualHostMBean _virtualHostMBean;
@@ -86,8 +84,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
_queueRegistry = virtualHost.getQueueRegistry();
_exchangeRegistry = virtualHost.getExchangeRegistry();
- _messageStore = virtualHost.getMessageStore();
+ _tranasctionLog = virtualHost.getTransactionLog();
_exchangeFactory = virtualHost.getExchangeFactory();
+ _routingTable = virtualHost.getRoutingTable();
}
public String getObjectInstanceName()
@@ -180,7 +179,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
null);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _messageStore.createQueue(queue);
+ _routingTable.createQueue(queue);
}
_queueRegistry.registerQueue(queue);
@@ -215,8 +214,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
try
{
queue.delete();
- _messageStore.removeQueue(queue);
-
+ _routingTable.removeQueue(queue);
}
catch (AMQException ex)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 341aae2875..2d0589c223 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -44,11 +44,11 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.Collection;
import java.util.LinkedList;
@@ -91,7 +91,7 @@ public class AMQChannel
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
- private final MessageStore _messageStore;
+ private final TransactionLog _transactionLog;
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
@@ -109,9 +109,9 @@ public class AMQChannel
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- private boolean _closing;
+ private boolean _closing;
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ public AMQChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog)
throws AMQException
{
//Set values from configuration
@@ -122,10 +122,10 @@ public class AMQChannel
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
- _messageStore = messageStore;
+ _transactionLog = transactionLog;
// by default the session is non-transactional
- _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
}
/** Sets this channel to be part of a local transaction */
@@ -150,7 +150,7 @@ public class AMQChannel
public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
{
- _currentMessage = new IncomingMessage(info, _txnContext, _session, _messageStore);
+ _currentMessage = new IncomingMessage(info, _txnContext, _session, _transactionLog);
_currentMessage.setExchange(e);
}
@@ -174,7 +174,7 @@ public class AMQChannel
routeCurrentMessage();
- _currentMessage.routingComplete(_messageStore);
+ _currentMessage.routingComplete(_transactionLog);
deliverCurrentMessageIfComplete();
@@ -474,7 +474,7 @@ public class AMQChannel
{
deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
}
else
{
@@ -534,7 +534,7 @@ public class AMQChannel
{
deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
}
else
@@ -669,7 +669,7 @@ public class AMQChannel
{
deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages);
}
else
{
@@ -870,9 +870,9 @@ public class AMQChannel
return _returnMessages;
}
- public MessageStore getMessageStore()
+ public TransactionLog getTransactionLog()
{
- return _messageStore;
+ return _transactionLog;
}
private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 705e84752b..02fb57c730 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -38,8 +38,9 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
public class VirtualHostConfiguration
{
@@ -101,8 +102,6 @@ public class VirtualHostConfiguration
exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString));
exchangeConfiguration.addConfiguration(configuration.subset("exchanges"));
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore messageStore = virtualHost.getMessageStore();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
@@ -159,7 +158,7 @@ public class VirtualHostConfiguration
queueConfiguration.addConfiguration(configuration.subset("queues"));
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore messageStore = virtualHost.getMessageStore();
+ RoutingTable routingTable = virtualHost.getRoutingTable();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
@@ -205,7 +204,7 @@ public class VirtualHostConfiguration
if (queue.isDurable())
{
- messageStore.createQueue(queue);
+ routingTable.createQueue(queue);
}
queueRegistry.registerQueue(queue);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index f28ba0d0de..3e930364df 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -25,8 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.routing.RoutingTable;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
@@ -56,9 +56,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
}
- public MessageStore getMessageStore()
+ public RoutingTable getRoutingTable()
{
- return _host.getMessageStore();
+ return _host.getRoutingTable();
}
public void registerExchange(Exchange exchange) throws AMQException
@@ -66,7 +66,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
_exchangeMap.put(exchange.getName(), exchange);
if (exchange.isDurable())
{
- getMessageStore().createExchange(exchange);
+ getRoutingTable().createExchange(exchange);
}
}
@@ -93,7 +93,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
{
if (e.isDurable())
{
- getMessageStore().removeExchange(e);
+ getRoutingTable().removeExchange(e);
}
e.close();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 054674aed4..0a7d5cfe6d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,7 +54,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
+ final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getTransactionLog()
);
session.addChannel(channel);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 71f38cb28a..b1e02aef7a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -30,20 +30,17 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.routing.RoutingTable;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -73,7 +70,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore store = virtualHost.getMessageStore();
+ RoutingTable routingTable = virtualHost.getRoutingTable();
if (!body.getPassive())
@@ -120,7 +117,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queue = createQueue(queueName, body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- store.createQueue(queue, body.getArguments());
+ routingTable.createQueue(queue, body.getArguments());
}
queueRegistry.registerQueue(queue);
if (autoRegister)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index b397db9246..85a697f3b7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -30,10 +30,9 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.server.routing.RoutingTable;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -62,7 +61,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore store = virtualHost.getMessageStore();
+ RoutingTable routingTable = virtualHost.getRoutingTable();
AMQQueue queue;
if (body.getQueue() == null)
@@ -115,7 +114,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
if (queue.isDurable())
{
- store.removeQueue(queue);
+ routingTable.removeQueue(queue);
}
MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 20926b99d7..94580a00ac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -27,10 +27,10 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -67,7 +67,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
private ArrayList<AMQQueue> _destinationQueues;
private AMQProtocolSession _publisher;
- private MessageStore _messageStore;
+ private TransactionLog _messageStore;
private long _expiration;
private Exchange _exchange;
@@ -76,7 +76,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
public IncomingMessage(final MessagePublishInfo info,
final TransactionalContext txnContext,
final AMQProtocolSession publisher,
- MessageStore messasgeStore)
+ TransactionLog messasgeStore)
{
_messagePublishInfo = info;
_txnContext = txnContext;
@@ -121,7 +121,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
- public void routingComplete(final MessageStore store) throws AMQException
+ public void routingComplete(final TransactionLog transactionLog) throws AMQException
{
if (isPersistent())
@@ -134,7 +134,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
{
for (int i = 0; i < _destinationQueues.size(); i++)
{
- store.enqueueMessage(_txnContext.getStoreContext(),
+ transactionLog.enqueueMessage(_txnContext.getStoreContext(),
_destinationQueues.get(i), getMessageId());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
index 2f6e05963c..e5e0b6e312 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,27 +58,27 @@ public class MessageFactory
/**
* Normal message creation path
- * @param store
+ * @param transactionLog
* @param persistent
* @return
*/
- public AMQMessage createMessage(MessageStore store, boolean persistent)
+ public AMQMessage createMessage(TransactionLog transactionLog, boolean persistent)
{
if (_state != State.OPEN)
{
_state = State.OPEN;
}
- return createNextMessage(_messageId.incrementAndGet(), store, persistent);
+ return createNextMessage(_messageId.incrementAndGet(), transactionLog, persistent);
}
/**
* Used for message recovery only and so only creates persistent messages.
* @param messageId the id that this message must have
- * @param store
+ * @param transactionLog
* @return
*/
- public AMQMessage createMessage(Long messageId, MessageStore store)
+ public AMQMessage createMessage(Long messageId, TransactionLog transactionLog)
{
if (_state != State.RECOVER)
{
@@ -96,14 +96,14 @@ public class MessageFactory
_messageId.set(messageId);
}
- return createNextMessage(messageId, store, true);
+ return createNextMessage(messageId, transactionLog, true);
}
- private AMQMessage createNextMessage(Long messageId, MessageStore store, boolean persistent)
+ private AMQMessage createNextMessage(Long messageId, TransactionLog transactionLog, boolean persistent)
{
if (persistent)
{
- return new PersistentAMQMessage(messageId, store);
+ return new PersistentAMQMessage(messageId, transactionLog);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
index 04e3635f92..804bb29ecd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -24,17 +24,17 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
public class PersistentAMQMessage extends TransientAMQMessage
{
- protected MessageStore _messageStore;
+ protected TransactionLog _transactionLog;
- public PersistentAMQMessage(Long messageId, MessageStore store)
+ public PersistentAMQMessage(Long messageId, TransactionLog transactionLog)
{
super(messageId);
- _messageStore = store;
+ _transactionLog = transactionLog;
}
@Override
@@ -42,7 +42,7 @@ public class PersistentAMQMessage extends TransientAMQMessage
throws AMQException
{
super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody);
- _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
+ _transactionLog.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
contentChunk, isLastContentBody);
}
@@ -54,13 +54,13 @@ public class PersistentAMQMessage extends TransientAMQMessage
super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
- _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+ _transactionLog.storeMessageMetaData(storeContext, _messageId, mmd);
}
@Override
public void removeMessage(StoreContext storeContext) throws AMQException
{
- _messageStore.removeMessage(storeContext, _messageId);
+ _transactionLog.removeMessage(storeContext, _messageId);
}
@Override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 547df7856d..a08e4e2667 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -6,11 +6,11 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.AMQException;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
@@ -210,7 +210,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+ _virtualHost.getRoutingTable().bindQueue(exchange, routingKey, this, arguments);
}
_bindings.addBinding(routingKey, arguments, exchange);
@@ -221,7 +221,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
exchange.deregisterQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+ _virtualHost.getRoutingTable().unbindQueue(exchange, routingKey, this, arguments);
}
boolean removed = _bindings.remove(routingKey, arguments, exchange);
@@ -581,7 +581,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
AMQMessage msg = entry.getMessage();
if (msg.isPersistent())
{
- _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
+ _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
}
//entry.dispose(storeContext);
@@ -826,7 +826,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- MessageStore store = getVirtualHost().getMessageStore();
+ TransactionLog transactionLog = getVirtualHost().getTransactionLog();
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -847,16 +847,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- store.beginTran(storeContext);
+ transactionLog.beginTran(storeContext);
- // Move the messages in on the message store.
+ // Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
AMQMessage message = entry.getMessage();
if (message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
// dequeue does not decrement the refence count
entry.dequeue(storeContext);
@@ -865,18 +865,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Commit and flush the move transcations.
try
{
- store.commitTran(storeContext);
+ transactionLog.commitTran(storeContext);
}
catch (AMQException e)
{
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
}
}
catch (AMQException e)
{
try
{
- store.abortTran(storeContext);
+ transactionLog.abortTran(storeContext);
}
catch (AMQException rollbackEx)
{
@@ -910,7 +910,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
final StoreContext storeContext)
{
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- MessageStore store = getVirtualHost().getMessageStore();
+ TransactionLog transactionLog = getVirtualHost().getTransactionLog();
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -938,34 +938,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- store.beginTran(storeContext);
+ transactionLog.beginTran(storeContext);
- // Move the messages in on the message store.
+ // Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
AMQMessage message = entry.getMessage();
if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
}
// Commit and flush the move transcations.
try
{
- store.commitTran(storeContext);
+ transactionLog.commitTran(storeContext);
}
catch (AMQException e)
{
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e);
}
}
catch (AMQException e)
{
try
{
- store.abortTran(storeContext);
+ transactionLog.abortTran(storeContext);
}
catch (AMQException rollbackEx)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java b/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
new file mode 100644
index 0000000000..6344127b24
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.routing;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.Configuration;
+
+public interface RoutingTable
+{
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param virtualHost The virtual host using by this store
+ * @param base The base element identifier from which all configuration items are relative. For example, if
+ * the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object.
+ *
+ * @throws Exception If any error occurs that means the store is unable to configure itself.
+ */
+ void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception;
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws Exception If the close fails.
+ */
+ void close() throws Exception;
+
+
+ /**
+ * Makes the specified exchange persistent.
+ *
+ * @param exchange The exchange to persist.
+ *
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ void createExchange(Exchange exchange) throws AMQException;
+
+ /**
+ * Removes the specified persistent exchange.
+ *
+ * @param exchange The exchange to remove.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void removeExchange(Exchange exchange) throws AMQException;
+
+ /**
+ * Binds the specified queue to an exchange with a routing key.
+ *
+ * @param exchange The exchange to bind to.
+ * @param routingKey The routing key to bind by.
+ * @param queue The queue to bind.
+ * @param args Additional parameters.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+ /**
+ * Unbinds the specified from an exchange under a particular routing key.
+ *
+ * @param exchange The exchange to unbind from.
+ * @param routingKey The routing key to unbind.
+ * @param queue The queue to unbind.
+ * @param args Additonal parameters.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * @param queue The queue to store.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void createQueue(AMQQueue queue) throws AMQException;
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * @param queue The queue to store.
+ * @param arguments The additional arguments to the binding
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
+
+ /**
+ * Removes the specified queue from the persistent store.
+ *
+ * @param queue The queue to remove.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ void removeQueue(final AMQQueue queue) throws AMQException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 425aed43d4..500fd4c7bf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -32,6 +32,8 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -62,7 +64,7 @@ import java.util.HashMap;
import java.util.TreeMap;
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore implements TransactionLog, RoutingTable
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -142,37 +144,40 @@ public class DerbyMessageStore implements MessageStore
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
- stateTransition(State.INITIAL, State.CONFIGURING);
+ //Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config
+ if (base.equals("store"))
+ {
+ stateTransition(State.INITIAL, State.CONFIGURING);
- initialiseDriver();
+ initialiseDriver();
- _virtualHost = virtualHost;
+ _virtualHost = virtualHost;
- _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
+ final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
{
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
}
- }
-
- createOrOpenDatabase(databasePath);
- // this recovers durable queues and persistent messages
+ createOrOpenDatabase(databasePath);
- _messageFactory = MessageFactory.getInstance();
+ // this recovers durable queues and persistent messages
- recover();
+ _messageFactory = MessageFactory.getInstance();
- stateTransition(State.RECOVERING, State.STARTED);
+ recover();
+ stateTransition(State.RECOVERING, State.STARTED);
+ }
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 587c85fc12..eee7be7ef6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -30,6 +30,8 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import java.util.ArrayList;
import java.util.Collections;
@@ -39,8 +41,14 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+/** A simple message store that stores the messages in a threadsafe structure in memory.
+ *
+ * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog
+ *
+ * This class really should have no storage unless we want to do inMemory Recovery.
+ *
+ */
+public class MemoryMessageStore implements TransactionLog, RoutingTable
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -64,10 +72,14 @@ public class MemoryMessageStore implements MessageStore
public void configure(String base, Configuration config)
{
- int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash tables");
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
+ //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
+ if (base.equals("store"))
+ {
+ int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
+ _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
+ }
}
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
index e65dded149..c927bb3272 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
+package org.apache.qpid.server.transactionlog;
import org.apache.commons.configuration.Configuration;
@@ -30,9 +30,11 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.StoreContext;
/**
- * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues
+ * TransactionLog defines the interface for performing transactions.
+ * This is used to preserve the state of messages, queues
* and exchanges in a transactional manner.
*
* <p/>All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which
@@ -52,7 +54,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
* <tr><td> Generate message identifiers.
* </table>
*/
-public interface MessageStore
+public interface TransactionLog
{
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
@@ -84,74 +86,7 @@ public interface MessageStore
*/
void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
- /**
- * Makes the specified exchange persistent.
- *
- * @param exchange The exchange to persist.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void createExchange(Exchange exchange) throws AMQException;
- /**
- * Removes the specified persistent exchange.
- *
- * @param exchange The exchange to remove.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void removeExchange(Exchange exchange) throws AMQException;
-
- /**
- * Binds the specified queue to an exchange with a routing key.
- *
- * @param exchange The exchange to bind to.
- * @param routingKey The routing key to bind by.
- * @param queue The queue to bind.
- * @param args Additional parameters.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
-
- /**
- * Unbinds the specified from an exchange under a particular routing key.
- *
- * @param exchange The exchange to unbind from.
- * @param routingKey The routing key to unbind.
- * @param queue The queue to unbind.
- * @param args Additonal parameters.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
-
- /**
- * Makes the specified queue persistent.
- *
- * @param queue The queue to store.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- void createQueue(AMQQueue queue) throws AMQException;
-
- /**
- * Makes the specified queue persistent.
- *
- * @param queue The queue to store.
- *
- * @param arguments The additional arguments to the binding
- * @throws AMQException If the operation fails for any reason.
- */
- void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
-
- /**
- * Removes the specified queue from the persistent store.
- *
- * @param queue The queue to remove.
- * @throws AMQException If the operation fails for any reason.
- */
- void removeQueue(final AMQQueue queue) throws AMQException;
/**
* Places a message onto a specified queue, in a given transactional context.
@@ -209,6 +144,7 @@ public interface MessageStore
*/
boolean inTran(StoreContext context);
+
/**
* Stores a chunk of message data.
*
@@ -261,7 +197,7 @@ public interface MessageStore
/**
* Is this store capable of persisting the data
- *
+ *
* @return true if this store is capable of persisting data
*/
boolean isPersistent();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 3c71282c57..9bc2e98fe9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -25,11 +25,11 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import java.util.List;
@@ -56,7 +56,6 @@ public class LocalTransactionalContext implements TransactionalContext
private boolean _messageDelivered = false;
private final AMQChannel _channel;
-
private abstract class DeliveryAction
{
@@ -125,9 +124,9 @@ public class LocalTransactionalContext implements TransactionalContext
return _channel.getReturnMessages();
}
- public MessageStore getMessageStore()
+ public TransactionLog getTransactionLog()
{
- return _channel.getMessageStore();
+ return _channel.getTransactionLog();
}
@@ -135,9 +134,9 @@ public class LocalTransactionalContext implements TransactionalContext
{
_txnBuffer.rollback(getStoreContext());
// Hack to deal with uncommitted non-transactional writes
- if (getMessageStore().inTran(getStoreContext()))
+ if (getTransactionLog().inTran(getStoreContext()))
{
- getMessageStore().abortTran(getStoreContext());
+ getTransactionLog().abortTran(getStoreContext());
_inTran = false;
}
@@ -224,7 +223,7 @@ public class LocalTransactionalContext implements TransactionalContext
_log.debug("Starting transaction on message store: " + this);
}
- getMessageStore().beginTran(getStoreContext());
+ getTransactionLog().beginTran(getStoreContext());
_inTran = true;
}
}
@@ -247,7 +246,7 @@ public class LocalTransactionalContext implements TransactionalContext
if (_messageDelivered && _inTran)
{
- _txnBuffer.enlist(new StoreMessageOperation(getMessageStore()));
+ _txnBuffer.enlist(new StoreMessageOperation(getTransactionLog()));
}
// fixme fail commit here ... QPID-440
try
@@ -257,7 +256,7 @@ public class LocalTransactionalContext implements TransactionalContext
finally
{
_messageDelivered = false;
- _inTran = getMessageStore().inTran(getStoreContext());
+ _inTran = getTransactionLog().inTran(getStoreContext());
}
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index acb2e93818..145d7f8b13 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -26,10 +26,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
/** @author Apache Software Foundation */
@@ -45,20 +45,20 @@ public class NonTransactionalContext implements TransactionalContext
- private final MessageStore _messageStore;
+ private final TransactionLog _transactionLog;
private final StoreContext _storeContext;
/** Whether we are in a transaction */
private boolean _inTran;
- public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
+ public NonTransactionalContext(TransactionLog transactionLog, StoreContext storeContext, AMQChannel channel,
List<RequiredDeliveryException> returnMessages)
{
_channel = channel;
_storeContext = storeContext;
_returnMessages = returnMessages;
- _messageStore = messageStore;
+ _transactionLog = transactionLog;
}
@@ -72,7 +72,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (!_inTran)
{
- _messageStore.beginTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
_inTran = true;
}
}
@@ -191,7 +191,7 @@ public class NonTransactionalContext implements TransactionalContext
}
if(_inTran)
{
- _messageStore.commitTran(_storeContext);
+ _transactionLog.commitTran(_storeContext);
_inTran = false;
}
}
@@ -200,7 +200,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (persistent)
{
- _messageStore.commitTran(_storeContext);
+ _transactionLog.commitTran(_storeContext);
_inTran = false;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
index 0e4d6c2030..f1fbca2e28 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
/**
* A transactional operation to store messages in an underlying persistent store. When this operation
@@ -31,11 +31,11 @@ import org.apache.qpid.server.store.StoreContext;
*/
public class StoreMessageOperation implements TxnOp
{
- private final MessageStore _messsageStore;
+ private final TransactionLog _transactionLog;
- public StoreMessageOperation(MessageStore messageStore)
+ public StoreMessageOperation(TransactionLog transactionLog)
{
- _messsageStore = messageStore;
+ _transactionLog = transactionLog;
}
public void prepare(StoreContext context) throws AMQException
@@ -48,11 +48,11 @@ public class StoreMessageOperation implements TxnOp
public void commit(StoreContext context) throws AMQException
{
- _messsageStore.commitTran(context);
+ _transactionLog.commitTran(context);
}
public void rollback(StoreContext context) throws AMQException
{
- _messsageStore.abortTran(context);
+ _transactionLog.abortTran(context);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index de4c8ac1ff..1497b4adb8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -30,6 +30,8 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -45,10 +47,8 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.access.plugins.SimpleXML;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.store.MessageStore;
public class VirtualHost implements Accessable
{
@@ -65,7 +65,9 @@ public class VirtualHost implements Accessable
private ExchangeFactory _exchangeFactory;
- private MessageStore _messageStore;
+ private TransactionLog _transactionLog;
+
+ private RoutingTable _routingTable;
protected VirtualHostMBean _virtualHostMBean;
@@ -96,6 +98,11 @@ public class VirtualHost implements Accessable
return _connectionRegistry;
}
+ public RoutingTable getRoutingTable()
+ {
+ return _routingTable;
+ }
+
/**
* Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
* implementaion of an Exchange MBean should extend this class.
@@ -128,12 +135,12 @@ public class VirtualHost implements Accessable
/**
* Used for testing only
* @param name
- * @param store
+ * @param transactionLog
* @throws Exception
*/
- public VirtualHost(String name, MessageStore store) throws Exception
+ public VirtualHost(String name, TransactionLog transactionLog) throws Exception
{
- this(name, new PropertiesConfiguration(), store);
+ this(name, new PropertiesConfiguration(), transactionLog);
}
/**
@@ -147,7 +154,7 @@ public class VirtualHost implements Accessable
this(name, hostConfig, null);
}
- public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+ public VirtualHost(String name, Configuration hostConfig, TransactionLog transactionLog) throws Exception
{
if (name == null || name.length() == 0)
{
@@ -166,19 +173,26 @@ public class VirtualHost implements Accessable
_exchangeFactory.initialise(hostConfig);
_exchangeRegistry = new DefaultExchangeRegistry(this);
- if (store != null)
+ if (transactionLog != null)
{
- _messageStore = store;
+ _transactionLog = transactionLog;
+ if (_transactionLog instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable) _transactionLog;
+ }
}
else
{
if (hostConfig == null)
{
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ throw new IllegalAccessException("HostConfig and TransactionLog cannot be null");
}
- initialiseMessageStore(hostConfig);
+ initialiseTransactionLog(hostConfig);
+ initialiseRoutingTable(hostConfig);
}
+
+
_exchangeRegistry.initialise();
_authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
@@ -224,23 +238,52 @@ public class VirtualHost implements Accessable
period);
}
}
-
- private void initialiseMessageStore(Configuration config) throws Exception
+
+ //todo we need to move from store.class to transactionlog.class
+ private void initialiseTransactionLog(Configuration config) throws Exception
{
- String messageStoreClass = config.getString("store.class");
+ String transactionLogClass = config.getString("store.class");
- Class clazz = Class.forName(messageStoreClass);
+ Class clazz = Class.forName(transactionLogClass);
Object o = clazz.newInstance();
- if (!(o instanceof MessageStore))
+ if (!(o instanceof TransactionLog))
{
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
" does not.");
}
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", config);
+ _transactionLog = (TransactionLog) o;
+ _transactionLog.configure(this, "store", config);
}
+ //todo we need to move from store.class to transactionlog.class
+ private void initialiseRoutingTable(Configuration config) throws Exception
+ {
+ String transactionLogClass = config.getString("routingtable.class");
+
+ if (transactionLogClass != null)
+ {
+ Class clazz = Class.forName(transactionLogClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof RoutingTable))
+ {
+ throw new ClassCastException("RoutingTable class must implement " + RoutingTable.class + ". Class " + clazz +
+ " does not.");
+ }
+ _routingTable = (RoutingTable) o;
+ _routingTable.configure(this, "routingtable", config);
+ }
+ else
+ {
+ if (_transactionLog instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable)_transactionLog;
+ }
+ }
+ }
+
+
public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
{
@@ -284,9 +327,9 @@ public class VirtualHost implements Accessable
throw new UnsupportedOperationException();
}
- public MessageStore getMessageStore()
+ public TransactionLog getTransactionLog()
{
- return _messageStore;
+ return _transactionLog;
}
public AuthenticationManager getAuthenticationManager()
@@ -320,10 +363,10 @@ public class VirtualHost implements Accessable
_houseKeepingTimer.cancel();
}
- //Close MessageStore
- if (_messageStore != null)
+ //Close TransactionLog
+ if (_transactionLog != null)
{
- _messageStore.close();
+ _transactionLog.close();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
index faa7b85d58..a6e060e793 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
@@ -302,7 +302,7 @@ public class MessageStoreTool
boolean warning = false;
for (VirtualHost vhost : vhosts)
{
- if (vhost.getMessageStore() instanceof MemoryMessageStore)
+ if (vhost.getTransactionLog() instanceof MemoryMessageStore)
{
_console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. "
+ "Changes will not persist.");
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 5c54c0b57f..40b08a2e39 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -31,18 +31,17 @@ import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.FailedDequeueException;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageCleanupException;
-import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
@@ -64,11 +63,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected final Set<TestQueue> queues = new HashSet<TestQueue>();
/** Not used in this test, just there to stub out the routing calls */
- private MessageStore _store = new MemoryMessageStore();
-
- private StoreContext _storeContext = new StoreContext();
-
- private MessageFactory _messageFactory = MessageFactory.getInstance();
+ private TransactionLog _transactionLog = new MemoryMessageStore();
private int count;
@@ -107,7 +102,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected void route(Message m) throws AMQException
{
exchange.route(m.getIncomingMessage());
- m.getIncomingMessage().routingComplete(_store);
+ m.getIncomingMessage().routingComplete(_transactionLog);
if (m.getIncomingMessage().allContentReceived())
{
m.getIncomingMessage().deliverToQueues();
@@ -382,9 +377,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
static class Message
{
- private static MessageStore _messageStore = new SkeletonMessageStore();
+ private static TransactionLog _transactionLog = new SkeletonMessageStore();
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(),
+ private static TransactionalContext _txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(),
null,
new LinkedList<RequiredDeliveryException>()
);
@@ -395,7 +390,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
MessagePublishInfo mpi = getPublishRequest(id);
- IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_messageStore), _messageStore);
+ IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
try
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index 396b8c5128..f8544a33bd 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -27,10 +27,10 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -46,17 +46,16 @@ public class DestWildExchangeTest extends TestCase
TopicExchange _exchange;
VirtualHost _vhost;
- MessageStore _store;
+ TransactionLog _tranasctionLog;
StoreContext _context;
InternalTestProtocolSession _protocolSession;
-
public void setUp() throws AMQException
{
_exchange = new TopicExchange();
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
- _store = new MemoryMessageStore();
+ _tranasctionLog = new MemoryMessageStore();
_context = new StoreContext();
_protocolSession = new InternalTestProtocolSession();
}
@@ -75,7 +74,7 @@ public class DestWildExchangeTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl(null, false, false, new AMQShortString("a.b"));
- IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _store);
+ IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _tranasctionLog);
_exchange.route(message);
@@ -497,7 +496,7 @@ public class DestWildExchangeTest extends TestCase
throws AMQException
{
_exchange.route(message);
- message.routingComplete(_store);
+ message.routingComplete(_tranasctionLog);
message.deliverToQueues();
}
@@ -547,11 +546,11 @@ public class DestWildExchangeTest extends TestCase
{
MessagePublishInfo info = new MessagePublishInfoImpl(null, false, true, new AMQShortString(s));
- TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
+ TransactionalContext trancontext = new NonTransactionalContext(_tranasctionLog, _context, null,
new LinkedList<RequiredDeliveryException>()
);
- IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _store);
+ IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _tranasctionLog);
message.setContentHeaderBody( new ContentHeaderBody());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index d5db87350b..f6c307757b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -28,11 +28,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import javax.management.JMException;
@@ -45,7 +45,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
/** Used for debugging. */
private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
- private MessageStore _messageStore = new SkeletonMessageStore();
+ private TransactionLog _transactionLog = new SkeletonMessageStore();
private AMQMinaProtocolSession _protocolSession;
private AMQChannel _channel;
private AMQProtocolSessionMBean _mbean;
@@ -60,7 +60,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
new AMQShortString("test"),
true,
_protocolSession.getVirtualHost(), null);
- AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore);
+ AMQChannel channel = new AMQChannel(_protocolSession,2, _transactionLog);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
@@ -71,7 +71,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
// check APIs
- AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore);
+ AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _transactionLog);
channel3.setLocalTransactional();
_protocolSession.addChannel(channel3);
_mbean.rollbackTransactions(2);
@@ -91,14 +91,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase
}
// check if closing of session works
- _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _transactionLog));
_mbean.closeConnection();
try
{
channelCount = _mbean.channels().size();
assertTrue(channelCount == 0);
// session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _transactionLog));
fail();
}
catch (AMQException ex)
@@ -117,7 +117,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true),
null);
_protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
- _channel = new AMQChannel(_protocolSession, 1, _messageStore);
+ _channel = new AMQChannel(_protocolSession, 1, _transactionLog);
_protocolSession.addChannel(_channel);
_mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index be40535df9..1bc50db1d5 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -32,6 +31,7 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
@@ -61,9 +61,9 @@ public class AMQQueueAlertTest extends TestCase
private AMQQueueMBean _queueMBean;
private VirtualHost _virtualHost;
private AMQMinaProtocolSession _protocolSession;
- private MessageStore _messageStore = new MemoryMessageStore();
+ private TransactionLog _transactionLog = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ private TransactionalContext _transactionalContext = new NonTransactionalContext(_transactionLog, _storeContext,
null,
new LinkedList<RequiredDeliveryException>()
);
@@ -186,7 +186,7 @@ public class AMQQueueAlertTest extends TestCase
public void testQueueDepthAlertWithSubscribers() throws Exception
{
_protocolSession = new InternalTestProtocolSession();
- AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _transactionLog);
_protocolSession.addChannel(channel);
// Create queue
@@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends TestCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = size; // in bytes
- IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore);
+ IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _transactionLog);
message.setContentHeaderBody(contentHeaderBody);
return message;
@@ -308,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
messages[i].enqueue(qs);
- messages[i].routingComplete(_messageStore);
+ messages[i].routingComplete(_transactionLog);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 97c76c8e9d..daa8e4beb7 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -41,7 +42,6 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -60,7 +60,7 @@ public class AMQQueueMBeanTest extends TestCase
private static long MESSAGE_SIZE = 1000;
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
- private MessageStore _messageStore;
+ private TransactionLog _transactionLog;
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
@@ -113,7 +113,7 @@ public class AMQQueueMBeanTest extends TestCase
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getTransactionLog());
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
@@ -130,7 +130,7 @@ public class AMQQueueMBeanTest extends TestCase
InternalTestProtocolSession protocolSession = new InternalTestProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
+ AMQChannel channel = new AMQChannel(protocolSession, 1, _transactionLog);
protocolSession.addChannel(channel);
Subscription subscription =
@@ -221,7 +221,7 @@ public class AMQQueueMBeanTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore);
+ msg.routingComplete(_transactionLog);
long id = msg.getMessageId();
@@ -266,7 +266,7 @@ public class AMQQueueMBeanTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore);
+ IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _transactionLog);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -278,9 +278,9 @@ public class AMQQueueMBeanTest extends TestCase
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(1);
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = _virtualHost.getMessageStore();
+ _transactionLog = _virtualHost.getTransactionLog();
- _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ _transactionalContext = new NonTransactionalContext(_transactionLog, _storeContext,
null,
new LinkedList<RequiredDeliveryException>()
);
@@ -307,7 +307,7 @@ public class AMQQueueMBeanTest extends TestCase
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore);
+ currentMessage.routingComplete(_transactionLog);
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
index 3633481012..2a51f42e4e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java
@@ -21,13 +21,13 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transactionlog.TransactionLog;
public class MockPersistentAMQMessage extends PersistentAMQMessage
{
- public MockPersistentAMQMessage(long messageId, MessageStore store)
+ public MockPersistentAMQMessage(long messageId, TransactionLog transactionLog)
throws AMQException
{
- super(messageId, store);
+ super(messageId, transactionLog);
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 99c88fac3e..0cc33bf102 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -24,11 +24,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.transport.Sender;
import javax.security.sasl.SaslServer;
@@ -41,13 +41,11 @@ import java.security.Principal;
*/
public class MockProtocolSession implements AMQProtocolSession
{
- private MessageStore _messageStore;
private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
- public MockProtocolSession(MessageStore messageStore)
+ public MockProtocolSession(TransactionLog transactionLog)
{
- _messageStore = messageStore;
}
public void dataBlockReceived(AMQDataBlock message) throws Exception
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
index 1e47f764df..bcbd83cde3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
@@ -73,6 +73,7 @@ public class PrincipalPermissionsTest extends TestCase
}
catch (Exception e)
{
+ e.printStackTrace();
fail(e.getMessage());
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 36f640a325..7722eae116 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -39,6 +38,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -151,7 +151,7 @@ public class MessageStoreTest extends TestCase
//Load the Virtualhost with the required MessageStore
reload(configuration);
- MessageStore messageStore = _virtualHost.getMessageStore();
+ TransactionLog transactionLog = _virtualHost.getTransactionLog();
createAllQueues();
createAllTopicQueues();
@@ -190,9 +190,9 @@ public class MessageStoreTest extends TestCase
assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size());
- if (!messageStore.isPersistent())
+ if (!transactionLog.isPersistent())
{
- _logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence.");
+ _logger.warn("Unable to test Persistent capabilities of messages store(" + transactionLog.getClass() + ") as it is not capable of peristence.");
return;
}
@@ -348,10 +348,10 @@ public class MessageStoreTest extends TestCase
try
{
currentMessage = new IncomingMessage(messageInfo,
- new NonTransactionalContext(_virtualHost.getMessageStore(),
+ new NonTransactionalContext(_virtualHost.getTransactionLog(),
new StoreContext(), null, null),
new InternalTestProtocolSession(),
- _virtualHost.getMessageStore());
+ _virtualHost.getTransactionLog());
}
catch (AMQException e)
{
@@ -388,7 +388,7 @@ public class MessageStoreTest extends TestCase
try
{
- currentMessage.routingComplete(_virtualHost.getMessageStore());
+ currentMessage.routingComplete(_virtualHost.getTransactionLog());
}
catch (AMQException e)
{
@@ -486,7 +486,7 @@ public class MessageStoreTest extends TestCase
if (queue.isDurable() && !queue.isAutoDelete())
{
- _virtualHost.getMessageStore().createQueue(queue, queueArguments);
+ _virtualHost.getRoutingTable().createQueue(queue, queueArguments);
}
}
catch (AMQException e)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index f08a15a8a7..a5b65b527c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -29,6 +29,8 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,7 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
* A message store that does nothing. Designed to be used in tests that do not want to use any message store
* functionality.
*/
-public class SkeletonMessageStore implements MessageStore
+public class SkeletonMessageStore implements TransactionLog , RoutingTable
{
private final AtomicLong _messageId = new AtomicLong(1);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
index 84d3d313d1..ca6644d141 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.txn;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.LinkedList;
import java.util.NoSuchElementException;
@@ -283,13 +283,13 @@ public class TxnBufferTest extends TestCase
class TxnTester extends NullOp
{
- private final MessageStore store;
+ private final TransactionLog store;
private final StoreContext context = new StoreContext();
- TxnTester(MessageStore store)
+ TxnTester(TransactionLog transactionLog)
{
- this.store = store;
+ this.store = transactionLog;
}
public void prepare() throws AMQException
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 9e24f13127..d150faf94a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -30,8 +30,8 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.framing.AMQShortString;
@@ -47,7 +47,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
public class InternalBrokerBaseCase extends TestCase
{
protected IApplicationRegistry _registry;
- protected MessageStore _messageStore;
+ protected TransactionLog _transactionLog;
protected MockChannel _channel;
protected InternalTestProtocolSession _session;
protected VirtualHost _virtualHost;
@@ -62,7 +62,7 @@ public class InternalBrokerBaseCase extends TestCase
ApplicationRegistry.initialise(_registry);
_virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = _virtualHost.getMessageStore();
+ _transactionLog = _virtualHost.getTransactionLog();
QUEUE_NAME = new AMQShortString("test");
_queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"),
@@ -78,7 +78,7 @@ public class InternalBrokerBaseCase extends TestCase
_session.setVirtualHost(_virtualHost);
- _channel = new MockChannel(_session, 1, _messageStore);
+ _channel = new MockChannel(_session, 1, _transactionLog);
_session.addChannel(_channel);
}
@@ -91,7 +91,7 @@ public class InternalBrokerBaseCase extends TestCase
protected void checkStoreContents(int messageCount)
{
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size());
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _transactionLog).getMessageMetaDataMap().size());
//The above publish message is sufficiently small not to fit in the header so no Body is required.
//assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index b6d42e6068..2605ed0d11 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -27,14 +27,13 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
-import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.Collection;
import java.util.HashMap;
@@ -49,11 +48,10 @@ public class TestApplicationRegistry extends ApplicationRegistry
private ExchangeFactory _exchangeFactory;
- private MessageStore _messageStore;
+ private TransactionLog _transactionLog;
private VirtualHost _vHost;
-
public TestApplicationRegistry()
{
super(new MapConfiguration(new HashMap()));
@@ -73,11 +71,11 @@ public class TestApplicationRegistry extends ApplicationRegistry
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _messageStore = new TestableMemoryMessageStore();
+ _transactionLog = new TestableMemoryMessageStore();
_virtualHostRegistry = new VirtualHostRegistry();
- _vHost = new VirtualHost("test", _messageStore);
+ _vHost = new VirtualHost("test", _transactionLog);
_virtualHostRegistry.registerVirtualHost(_vHost);
@@ -114,9 +112,9 @@ public class TestApplicationRegistry extends ApplicationRegistry
_accessManager = newManager;
}
- public MessageStore getMessageStore()
+ public TransactionLog getTransactionLog()
{
- return _messageStore;
+ return _transactionLog;
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
new file mode 100644
index 0000000000..ba19fd5d5e
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+public class VirtualhostInitRoutingTableFromTransactionLogTest extends TestCase
+{
+ public void test()
+ {
+ PropertiesConfiguration env = new PropertiesConfiguration();
+
+
+ env.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
+
+ VirtualHost _virtualHost = null;
+ try
+ {
+ _virtualHost = new VirtualHost("test", env);
+
+ assertNotNull(_virtualHost.getTransactionLog());
+ assertNotNull(_virtualHost.getRoutingTable());
+ assertEquals(_virtualHost.getTransactionLog(),_virtualHost.getRoutingTable());
+ }
+ catch (Exception e)
+ {
+ fail(e.getMessage());
+ }
+
+
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
index 447d09429d..77767463ea 100644
--- a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
+++ b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
@@ -21,18 +21,18 @@
package org.apache.qpid.util;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
public class MockChannel extends AMQChannel
{
- public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ public MockChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog)
throws AMQException
{
- super(session, channelId, messageStore);
+ super(session, channelId, transactionLog);
}
public Subscription getSubscription(AMQShortString subscription)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index dd71b2d0da..256491194d 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -30,18 +30,21 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import java.util.HashMap;
import java.util.Iterator;
-public class SlowMessageStore implements MessageStore
+public class SlowMessageStore implements TransactionLog, RoutingTable
{
private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
private static final String DELAYS = "delays";
private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
private long _defaultDelay = 0L;
- private MessageStore _realStore = new MemoryMessageStore();
+ private TransactionLog _realTransactionLog = new MemoryMessageStore();
+ private RoutingTable _realRoutingTable = (RoutingTable)_realTransactionLog;
private static final String PRE = "pre";
private static final String POST = "post";
private String DEFAULT_DELAY = "default";
@@ -53,30 +56,30 @@ public class SlowMessageStore implements MessageStore
configureDelays(delays);
- String messageStoreClass = config.getString(base + ".store.class");
+ String transactionLogClass = config.getString(base + ".store.class");
if (delays.containsKey(DEFAULT_DELAY))
{
_defaultDelay = delays.getLong(DEFAULT_DELAY);
}
- if (messageStoreClass != null)
+ if (transactionLogClass != null)
{
- Class clazz = Class.forName(messageStoreClass);
+ Class clazz = Class.forName(transactionLogClass);
Object o = clazz.newInstance();
- if (!(o instanceof MessageStore))
+ if (!(o instanceof TransactionLog))
{
- throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
" does not.");
}
- _realStore = (MessageStore) o;
- _realStore.configure(virtualHost, base + ".store", config);
+ _realTransactionLog = (TransactionLog) o;
+ _realTransactionLog.configure(virtualHost, base , config);
}
else
{
- _realStore.configure(virtualHost, base + ".store", config);
+ _realTransactionLog.configure(virtualHost, base , config);
}
}
@@ -150,42 +153,42 @@ public class SlowMessageStore implements MessageStore
public void close() throws Exception
{
doPreDelay("close");
- _realStore.close();
+ _realTransactionLog.close();
doPostDelay("close");
}
public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
{
doPreDelay("removeMessage");
- _realStore.removeMessage(storeContext, messageId);
+ _realTransactionLog.removeMessage(storeContext, messageId);
doPostDelay("removeMessage");
}
public void createExchange(Exchange exchange) throws AMQException
{
doPreDelay("createExchange");
- _realStore.createExchange(exchange);
+ _realRoutingTable.createExchange(exchange);
doPostDelay("createExchange");
}
public void removeExchange(Exchange exchange) throws AMQException
{
doPreDelay("removeExchange");
- _realStore.removeExchange(exchange);
+ _realRoutingTable.removeExchange(exchange);
doPostDelay("removeExchange");
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
doPreDelay("bindQueue");
- _realStore.bindQueue(exchange, routingKey, queue, args);
+ _realRoutingTable.bindQueue(exchange, routingKey, queue, args);
doPostDelay("bindQueue");
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
doPreDelay("unbindQueue");
- _realStore.unbindQueue(exchange, routingKey, queue, args);
+ _realRoutingTable.unbindQueue(exchange, routingKey, queue, args);
doPostDelay("unbindQueue");
}
@@ -197,56 +200,56 @@ public class SlowMessageStore implements MessageStore
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
doPreDelay("createQueue");
- _realStore.createQueue(queue, arguments);
+ _realRoutingTable.createQueue(queue, arguments);
doPostDelay("createQueue");
}
public void removeQueue(AMQQueue queue) throws AMQException
{
doPreDelay("removeQueue");
- _realStore.removeQueue(queue);
+ _realRoutingTable.removeQueue(queue);
doPostDelay("removeQueue");
}
public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
doPreDelay("enqueueMessage");
- _realStore.enqueueMessage(context, queue, messageId);
+ _realTransactionLog.enqueueMessage(context, queue, messageId);
doPostDelay("enqueueMessage");
}
public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
doPreDelay("dequeueMessage");
- _realStore.dequeueMessage(context, queue, messageId);
+ _realTransactionLog.dequeueMessage(context, queue, messageId);
doPostDelay("dequeueMessage");
}
public void beginTran(StoreContext context) throws AMQException
{
doPreDelay("beginTran");
- _realStore.beginTran(context);
+ _realTransactionLog.beginTran(context);
doPostDelay("beginTran");
}
public void commitTran(StoreContext context) throws AMQException
{
doPreDelay("commitTran");
- _realStore.commitTran(context);
+ _realTransactionLog.commitTran(context);
doPostDelay("commitTran");
}
public void abortTran(StoreContext context) throws AMQException
{
doPreDelay("abortTran");
- _realStore.abortTran(context);
+ _realTransactionLog.abortTran(context);
doPostDelay("abortTran");
}
public boolean inTran(StoreContext context)
{
doPreDelay("inTran");
- boolean b = _realStore.inTran(context);
+ boolean b = _realTransactionLog.inTran(context);
doPostDelay("inTran");
return b;
}
@@ -254,21 +257,21 @@ public class SlowMessageStore implements MessageStore
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
{
doPreDelay("storeContentBodyChunk");
- _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+ _realTransactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
doPostDelay("storeContentBodyChunk");
}
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
{
doPreDelay("storeMessageMetaData");
- _realStore.storeMessageMetaData(context, messageId, messageMetaData);
+ _realTransactionLog.storeMessageMetaData(context, messageId, messageMetaData);
doPostDelay("storeMessageMetaData");
}
public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
doPreDelay("getMessageMetaData");
- MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId);
+ MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId);
doPostDelay("getMessageMetaData");
return mmd;
}
@@ -276,14 +279,14 @@ public class SlowMessageStore implements MessageStore
public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
doPreDelay("getContentBodyChunk");
- ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index);
+ ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index);
doPostDelay("getContentBodyChunk");
return c;
}
public boolean isPersistent()
{
- return _realStore.isPersistent();
+ return _realTransactionLog.isPersistent();
}
}