diff options
36 files changed, 492 insertions, 333 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index fc6057afd2..b3c843ebaa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -42,12 +42,8 @@ import javax.management.MBeanException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import org.apache.commons.configuration.Configuration; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -59,8 +55,9 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.routing.RoutingTable; /** * This MBean implements the broker management interface and exposes the @@ -72,7 +69,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr private final QueueRegistry _queueRegistry; private final ExchangeRegistry _exchangeRegistry; private final ExchangeFactory _exchangeFactory; - private final MessageStore _messageStore; + private final TransactionLog _tranasctionLog; + private final RoutingTable _routingTable; private final VirtualHost.VirtualHostMBean _virtualHostMBean; @@ -86,8 +84,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr _queueRegistry = virtualHost.getQueueRegistry(); _exchangeRegistry = virtualHost.getExchangeRegistry(); - _messageStore = virtualHost.getMessageStore(); + _tranasctionLog = virtualHost.getTransactionLog(); _exchangeFactory = virtualHost.getExchangeFactory(); + _routingTable = virtualHost.getRoutingTable(); } public String getObjectInstanceName() @@ -180,7 +179,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr null); if (queue.isDurable() && !queue.isAutoDelete()) { - _messageStore.createQueue(queue); + _routingTable.createQueue(queue); } _queueRegistry.registerQueue(queue); @@ -215,8 +214,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr try { queue.delete(); - _messageStore.removeQueue(queue); - + _routingTable.removeQueue(queue); } catch (AMQException ex) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 341aae2875..2d0589c223 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -44,11 +44,11 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.Collection; import java.util.LinkedList; @@ -91,7 +91,7 @@ public class AMQChannel /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); - private final MessageStore _messageStore; + private final TransactionLog _transactionLog; private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); @@ -109,9 +109,9 @@ public class AMQChannel // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - private boolean _closing; + private boolean _closing; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + public AMQChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog) throws AMQException { //Set values from configuration @@ -122,10 +122,10 @@ public class AMQChannel _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); - _messageStore = messageStore; + _transactionLog = transactionLog; // by default the session is non-transactional - _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); + _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages); } /** Sets this channel to be part of a local transaction */ @@ -150,7 +150,7 @@ public class AMQChannel public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException { - _currentMessage = new IncomingMessage(info, _txnContext, _session, _messageStore); + _currentMessage = new IncomingMessage(info, _txnContext, _session, _transactionLog); _currentMessage.setExchange(e); } @@ -174,7 +174,7 @@ public class AMQChannel routeCurrentMessage(); - _currentMessage.routingComplete(_messageStore); + _currentMessage.routingComplete(_transactionLog); deliverCurrentMessageIfComplete(); @@ -474,7 +474,7 @@ public class AMQChannel { deliveryContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); + new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages); } else { @@ -534,7 +534,7 @@ public class AMQChannel { deliveryContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); + new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages); } else @@ -669,7 +669,7 @@ public class AMQChannel { deliveryContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages); + new NonTransactionalContext(_transactionLog, _storeContext, this, _returnMessages); } else { @@ -870,9 +870,9 @@ public class AMQChannel return _returnMessages; } - public MessageStore getMessageStore() + public TransactionLog getTransactionLog() { - return _messageStore; + return _transactionLog; } private final ClientDeliveryMethod _clientDeliveryMethod = new ClientDeliveryMethod() diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 705e84752b..02fb57c730 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -38,8 +38,9 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.routing.RoutingTable; public class VirtualHostConfiguration { @@ -101,8 +102,6 @@ public class VirtualHostConfiguration exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString)); exchangeConfiguration.addConfiguration(configuration.subset("exchanges")); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - MessageStore messageStore = virtualHost.getMessageStore(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); @@ -159,7 +158,7 @@ public class VirtualHostConfiguration queueConfiguration.addConfiguration(configuration.subset("queues")); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - MessageStore messageStore = virtualHost.getMessageStore(); + RoutingTable routingTable = virtualHost.getRoutingTable(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); @@ -205,7 +204,7 @@ public class VirtualHostConfiguration if (queue.isDurable()) { - messageStore.createQueue(queue); + routingTable.createQueue(queue); } queueRegistry.registerQueue(queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index f28ba0d0de..3e930364df 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -25,8 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.routing.RoutingTable; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; @@ -56,9 +56,9 @@ public class DefaultExchangeRegistry implements ExchangeRegistry new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this); } - public MessageStore getMessageStore() + public RoutingTable getRoutingTable() { - return _host.getMessageStore(); + return _host.getRoutingTable(); } public void registerExchange(Exchange exchange) throws AMQException @@ -66,7 +66,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _exchangeMap.put(exchange.getName(), exchange); if (exchange.isDurable()) { - getMessageStore().createExchange(exchange); + getRoutingTable().createExchange(exchange); } } @@ -93,7 +93,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry { if (e.isDurable()) { - getMessageStore().removeExchange(e); + getRoutingTable().removeExchange(e); } e.close(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 054674aed4..0a7d5cfe6d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -54,7 +54,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore() + final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getTransactionLog() ); session.addChannel(channel); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 71f38cb28a..b1e02aef7a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -30,20 +30,17 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.AMQChannel; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.routing.RoutingTable; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { @@ -73,7 +70,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - MessageStore store = virtualHost.getMessageStore(); + RoutingTable routingTable = virtualHost.getRoutingTable(); if (!body.getPassive()) @@ -120,7 +117,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queue = createQueue(queueName, body, virtualHost, session); if (queue.isDurable() && !queue.isAutoDelete()) { - store.createQueue(queue, body.getArguments()); + routingTable.createQueue(queue, body.getArguments()); } queueRegistry.registerQueue(queue); if (autoRegister) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index b397db9246..85a697f3b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -30,10 +30,9 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.server.routing.RoutingTable; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -62,7 +61,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - MessageStore store = virtualHost.getMessageStore(); + RoutingTable routingTable = virtualHost.getRoutingTable(); AMQQueue queue; if (body.getQueue() == null) @@ -115,7 +114,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB if (queue.isDurable()) { - store.removeQueue(queue); + routingTable.removeQueue(queue); } MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 20926b99d7..94580a00ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -27,10 +27,10 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; @@ -67,7 +67,7 @@ public class IncomingMessage implements Filterable<RuntimeException> private ArrayList<AMQQueue> _destinationQueues; private AMQProtocolSession _publisher; - private MessageStore _messageStore; + private TransactionLog _messageStore; private long _expiration; private Exchange _exchange; @@ -76,7 +76,7 @@ public class IncomingMessage implements Filterable<RuntimeException> public IncomingMessage(final MessagePublishInfo info, final TransactionalContext txnContext, final AMQProtocolSession publisher, - MessageStore messasgeStore) + TransactionLog messasgeStore) { _messagePublishInfo = info; _txnContext = txnContext; @@ -121,7 +121,7 @@ public class IncomingMessage implements Filterable<RuntimeException> } - public void routingComplete(final MessageStore store) throws AMQException + public void routingComplete(final TransactionLog transactionLog) throws AMQException { if (isPersistent()) @@ -134,7 +134,7 @@ public class IncomingMessage implements Filterable<RuntimeException> { for (int i = 0; i < _destinationQueues.size(); i++) { - store.enqueueMessage(_txnContext.getStoreContext(), + transactionLog.enqueueMessage(_txnContext.getStoreContext(), _destinationQueues.get(i), getMessageId()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java index 2f6e05963c..e5e0b6e312 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.concurrent.atomic.AtomicLong; @@ -58,27 +58,27 @@ public class MessageFactory /** * Normal message creation path - * @param store + * @param transactionLog * @param persistent * @return */ - public AMQMessage createMessage(MessageStore store, boolean persistent) + public AMQMessage createMessage(TransactionLog transactionLog, boolean persistent) { if (_state != State.OPEN) { _state = State.OPEN; } - return createNextMessage(_messageId.incrementAndGet(), store, persistent); + return createNextMessage(_messageId.incrementAndGet(), transactionLog, persistent); } /** * Used for message recovery only and so only creates persistent messages. * @param messageId the id that this message must have - * @param store + * @param transactionLog * @return */ - public AMQMessage createMessage(Long messageId, MessageStore store) + public AMQMessage createMessage(Long messageId, TransactionLog transactionLog) { if (_state != State.RECOVER) { @@ -96,14 +96,14 @@ public class MessageFactory _messageId.set(messageId); } - return createNextMessage(messageId, store, true); + return createNextMessage(messageId, transactionLog, true); } - private AMQMessage createNextMessage(Long messageId, MessageStore store, boolean persistent) + private AMQMessage createNextMessage(Long messageId, TransactionLog transactionLog, boolean persistent) { if (persistent) { - return new PersistentAMQMessage(messageId, store); + return new PersistentAMQMessage(messageId, transactionLog); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java index 04e3635f92..804bb29ecd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -24,17 +24,17 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.transactionlog.TransactionLog; public class PersistentAMQMessage extends TransientAMQMessage { - protected MessageStore _messageStore; + protected TransactionLog _transactionLog; - public PersistentAMQMessage(Long messageId, MessageStore store) + public PersistentAMQMessage(Long messageId, TransactionLog transactionLog) { super(messageId); - _messageStore = store; + _transactionLog = transactionLog; } @Override @@ -42,7 +42,7 @@ public class PersistentAMQMessage extends TransientAMQMessage throws AMQException { super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody); - _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1, + _transactionLog.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1, contentChunk, isLastContentBody); } @@ -54,13 +54,13 @@ public class PersistentAMQMessage extends TransientAMQMessage super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody); MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime); - _messageStore.storeMessageMetaData(storeContext, _messageId, mmd); + _transactionLog.storeMessageMetaData(storeContext, _messageId, mmd); } @Override public void removeMessage(StoreContext storeContext) throws AMQException { - _messageStore.removeMessage(storeContext, _messageId); + _transactionLog.removeMessage(storeContext, _messageId); } @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 547df7856d..a08e4e2667 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -6,11 +6,11 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.AMQException; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; @@ -210,7 +210,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener exchange.registerQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) { - _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments); + _virtualHost.getRoutingTable().bindQueue(exchange, routingKey, this, arguments); } _bindings.addBinding(routingKey, arguments, exchange); @@ -221,7 +221,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener exchange.deregisterQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) { - _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments); + _virtualHost.getRoutingTable().unbindQueue(exchange, routingKey, this, arguments); } boolean removed = _bindings.remove(routingKey, arguments, exchange); @@ -581,7 +581,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener AMQMessage msg = entry.getMessage(); if (msg.isPersistent()) { - _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId()); + _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId()); } //entry.dispose(storeContext); @@ -826,7 +826,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - MessageStore store = getVirtualHost().getMessageStore(); + TransactionLog transactionLog = getVirtualHost().getTransactionLog(); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -847,16 +847,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { - store.beginTran(storeContext); + transactionLog.beginTran(storeContext); - // Move the messages in on the message store. + // Move the messages in on the transaction log. for (QueueEntry entry : entries) { AMQMessage message = entry.getMessage(); if (message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue, message.getMessageId()); + transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); } // dequeue does not decrement the refence count entry.dequeue(storeContext); @@ -865,18 +865,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Commit and flush the move transcations. try { - store.commitTran(storeContext); + transactionLog.commitTran(storeContext); } catch (AMQException e) { - throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e); } } catch (AMQException e) { try { - store.abortTran(storeContext); + transactionLog.abortTran(storeContext); } catch (AMQException rollbackEx) { @@ -910,7 +910,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener final StoreContext storeContext) { AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - MessageStore store = getVirtualHost().getMessageStore(); + TransactionLog transactionLog = getVirtualHost().getTransactionLog(); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -938,34 +938,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { - store.beginTran(storeContext); + transactionLog.beginTran(storeContext); - // Move the messages in on the message store. + // Move the messages in on the transaction log. for (QueueEntry entry : entries) { AMQMessage message = entry.getMessage(); if (message.isReferenced() && message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue, message.getMessageId()); + transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); } } // Commit and flush the move transcations. try { - store.commitTran(storeContext); + transactionLog.commitTran(storeContext); } catch (AMQException e) { - throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + throw new RuntimeException("Failed to commit transaction whilst moving messages on transaction log.", e); } } catch (AMQException e) { try { - store.abortTran(storeContext); + transactionLog.abortTran(storeContext); } catch (AMQException rollbackEx) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java b/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java new file mode 100644 index 0000000000..6344127b24 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.routing; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.commons.configuration.Configuration; + +public interface RoutingTable +{ + /** + * Called after instantiation in order to configure the message store. A particular implementation can define + * whatever parameters it wants. + * + * @param virtualHost The virtual host using by this store + * @param base The base element identifier from which all configuration items are relative. For example, if + * the base element is "store", the all elements used by concrete classes will be "store.foo" etc. + * @param config The apache commons configuration object. + * + * @throws Exception If any error occurs that means the store is unable to configure itself. + */ + void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception; + + /** + * Called to close and cleanup any resources used by the message store. + * + * @throws Exception If the close fails. + */ + void close() throws Exception; + + + /** + * Makes the specified exchange persistent. + * + * @param exchange The exchange to persist. + * + * @throws org.apache.qpid.AMQException If the operation fails for any reason. + */ + void createExchange(Exchange exchange) throws AMQException; + + /** + * Removes the specified persistent exchange. + * + * @param exchange The exchange to remove. + * + * @throws AMQException If the operation fails for any reason. + */ + void removeExchange(Exchange exchange) throws AMQException; + + /** + * Binds the specified queue to an exchange with a routing key. + * + * @param exchange The exchange to bind to. + * @param routingKey The routing key to bind by. + * @param queue The queue to bind. + * @param args Additional parameters. + * + * @throws AMQException If the operation fails for any reason. + */ + void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + + /** + * Unbinds the specified from an exchange under a particular routing key. + * + * @param exchange The exchange to unbind from. + * @param routingKey The routing key to unbind. + * @param queue The queue to unbind. + * @param args Additonal parameters. + * + * @throws AMQException If the operation fails for any reason. + */ + void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + + /** + * Makes the specified queue persistent. + * + * @param queue The queue to store. + * + * @throws AMQException If the operation fails for any reason. + */ + void createQueue(AMQQueue queue) throws AMQException; + + /** + * Makes the specified queue persistent. + * + * @param queue The queue to store. + * @param arguments The additional arguments to the binding + * + * @throws AMQException If the operation fails for any reason. + */ + void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException; + + /** + * Removes the specified queue from the persistent store. + * + * @param queue The queue to remove. + * + * @throws AMQException If the operation fails for any reason. + */ + void removeQueue(final AMQQueue queue) throws AMQException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 425aed43d4..500fd4c7bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -32,6 +32,8 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -62,7 +64,7 @@ import java.util.HashMap; import java.util.TreeMap; -public class DerbyMessageStore implements MessageStore +public class DerbyMessageStore implements TransactionLog, RoutingTable { private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); @@ -142,37 +144,40 @@ public class DerbyMessageStore implements MessageStore public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception { - stateTransition(State.INITIAL, State.CONFIGURING); + //Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config + if (base.equals("store")) + { + stateTransition(State.INITIAL, State.CONFIGURING); - initialiseDriver(); + initialiseDriver(); - _virtualHost = virtualHost; + _virtualHost = virtualHost; - _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName()); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName()); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); + final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); - File environmentPath = new File(databasePath); - if (!environmentPath.exists()) - { - if (!environmentPath.mkdirs()) + File environmentPath = new File(databasePath); + if (!environmentPath.exists()) { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } } - } - - createOrOpenDatabase(databasePath); - // this recovers durable queues and persistent messages + createOrOpenDatabase(databasePath); - _messageFactory = MessageFactory.getInstance(); + // this recovers durable queues and persistent messages - recover(); + _messageFactory = MessageFactory.getInstance(); - stateTransition(State.RECOVERING, State.STARTED); + recover(); + stateTransition(State.RECOVERING, State.STARTED); + } } private static synchronized void initialiseDriver() throws ClassNotFoundException diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 587c85fc12..eee7be7ef6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -30,6 +30,8 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.routing.RoutingTable; import java.util.ArrayList; import java.util.Collections; @@ -39,8 +41,14 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -/** A simple message store that stores the messages in a threadsafe structure in memory. */ -public class MemoryMessageStore implements MessageStore +/** A simple message store that stores the messages in a threadsafe structure in memory. + * + * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog + * + * This class really should have no storage unless we want to do inMemory Recovery. + * + */ +public class MemoryMessageStore implements TransactionLog, RoutingTable { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -64,10 +72,14 @@ public class MemoryMessageStore implements MessageStore public void configure(String base, Configuration config) { - int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); - _log.info("Using capacity " + hashtableCapacity + " for hash tables"); - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity); + //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable' + if (base.equals("store")) + { + int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); + _log.info("Using capacity " + hashtableCapacity + " for hash tables"); + _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity); + } } public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java index e65dded149..c927bb3272 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.store; +package org.apache.qpid.server.transactionlog; import org.apache.commons.configuration.Configuration; @@ -30,9 +30,11 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.StoreContext; /** - * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues + * TransactionLog defines the interface for performing transactions. + * This is used to preserve the state of messages, queues * and exchanges in a transactional manner. * * <p/>All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which @@ -52,7 +54,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; * <tr><td> Generate message identifiers. * </table> */ -public interface MessageStore +public interface TransactionLog { /** * Called after instantiation in order to configure the message store. A particular implementation can define @@ -84,74 +86,7 @@ public interface MessageStore */ void removeMessage(StoreContext storeContext, Long messageId) throws AMQException; - /** - * Makes the specified exchange persistent. - * - * @param exchange The exchange to persist. - * - * @throws AMQException If the operation fails for any reason. - */ - void createExchange(Exchange exchange) throws AMQException; - /** - * Removes the specified persistent exchange. - * - * @param exchange The exchange to remove. - * - * @throws AMQException If the operation fails for any reason. - */ - void removeExchange(Exchange exchange) throws AMQException; - - /** - * Binds the specified queue to an exchange with a routing key. - * - * @param exchange The exchange to bind to. - * @param routingKey The routing key to bind by. - * @param queue The queue to bind. - * @param args Additional parameters. - * - * @throws AMQException If the operation fails for any reason. - */ - void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; - - /** - * Unbinds the specified from an exchange under a particular routing key. - * - * @param exchange The exchange to unbind from. - * @param routingKey The routing key to unbind. - * @param queue The queue to unbind. - * @param args Additonal parameters. - * - * @throws AMQException If the operation fails for any reason. - */ - void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; - - /** - * Makes the specified queue persistent. - * - * @param queue The queue to store. - * - * @throws AMQException If the operation fails for any reason. - */ - void createQueue(AMQQueue queue) throws AMQException; - - /** - * Makes the specified queue persistent. - * - * @param queue The queue to store. - * - * @param arguments The additional arguments to the binding - * @throws AMQException If the operation fails for any reason. - */ - void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException; - - /** - * Removes the specified queue from the persistent store. - * - * @param queue The queue to remove. - * @throws AMQException If the operation fails for any reason. - */ - void removeQueue(final AMQQueue queue) throws AMQException; /** * Places a message onto a specified queue, in a given transactional context. @@ -209,6 +144,7 @@ public interface MessageStore */ boolean inTran(StoreContext context); + /** * Stores a chunk of message data. * @@ -261,7 +197,7 @@ public interface MessageStore /** * Is this store capable of persisting the data - * + * * @return true if this store is capable of persisting data */ boolean isPersistent(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 3c71282c57..9bc2e98fe9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -25,11 +25,11 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.*; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import java.util.List; @@ -56,7 +56,6 @@ public class LocalTransactionalContext implements TransactionalContext private boolean _messageDelivered = false; private final AMQChannel _channel; - private abstract class DeliveryAction { @@ -125,9 +124,9 @@ public class LocalTransactionalContext implements TransactionalContext return _channel.getReturnMessages(); } - public MessageStore getMessageStore() + public TransactionLog getTransactionLog() { - return _channel.getMessageStore(); + return _channel.getTransactionLog(); } @@ -135,9 +134,9 @@ public class LocalTransactionalContext implements TransactionalContext { _txnBuffer.rollback(getStoreContext()); // Hack to deal with uncommitted non-transactional writes - if (getMessageStore().inTran(getStoreContext())) + if (getTransactionLog().inTran(getStoreContext())) { - getMessageStore().abortTran(getStoreContext()); + getTransactionLog().abortTran(getStoreContext()); _inTran = false; } @@ -224,7 +223,7 @@ public class LocalTransactionalContext implements TransactionalContext _log.debug("Starting transaction on message store: " + this); } - getMessageStore().beginTran(getStoreContext()); + getTransactionLog().beginTran(getStoreContext()); _inTran = true; } } @@ -247,7 +246,7 @@ public class LocalTransactionalContext implements TransactionalContext if (_messageDelivered && _inTran) { - _txnBuffer.enlist(new StoreMessageOperation(getMessageStore())); + _txnBuffer.enlist(new StoreMessageOperation(getTransactionLog())); } // fixme fail commit here ... QPID-440 try @@ -257,7 +256,7 @@ public class LocalTransactionalContext implements TransactionalContext finally { _messageDelivered = false; - _inTran = getMessageStore().inTran(getStoreContext()); + _inTran = getTransactionLog().inTran(getStoreContext()); } try diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index acb2e93818..145d7f8b13 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -26,10 +26,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.*; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; /** @author Apache Software Foundation */ @@ -45,20 +45,20 @@ public class NonTransactionalContext implements TransactionalContext - private final MessageStore _messageStore; + private final TransactionLog _transactionLog; private final StoreContext _storeContext; /** Whether we are in a transaction */ private boolean _inTran; - public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, + public NonTransactionalContext(TransactionLog transactionLog, StoreContext storeContext, AMQChannel channel, List<RequiredDeliveryException> returnMessages) { _channel = channel; _storeContext = storeContext; _returnMessages = returnMessages; - _messageStore = messageStore; + _transactionLog = transactionLog; } @@ -72,7 +72,7 @@ public class NonTransactionalContext implements TransactionalContext { if (!_inTran) { - _messageStore.beginTran(_storeContext); + _transactionLog.beginTran(_storeContext); _inTran = true; } } @@ -191,7 +191,7 @@ public class NonTransactionalContext implements TransactionalContext } if(_inTran) { - _messageStore.commitTran(_storeContext); + _transactionLog.commitTran(_storeContext); _inTran = false; } } @@ -200,7 +200,7 @@ public class NonTransactionalContext implements TransactionalContext { if (persistent) { - _messageStore.commitTran(_storeContext); + _transactionLog.commitTran(_storeContext); _inTran = false; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java index 0e4d6c2030..f1fbca2e28 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.txn; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.transactionlog.TransactionLog; /** * A transactional operation to store messages in an underlying persistent store. When this operation @@ -31,11 +31,11 @@ import org.apache.qpid.server.store.StoreContext; */ public class StoreMessageOperation implements TxnOp { - private final MessageStore _messsageStore; + private final TransactionLog _transactionLog; - public StoreMessageOperation(MessageStore messageStore) + public StoreMessageOperation(TransactionLog transactionLog) { - _messsageStore = messageStore; + _transactionLog = transactionLog; } public void prepare(StoreContext context) throws AMQException @@ -48,11 +48,11 @@ public class StoreMessageOperation implements TxnOp public void commit(StoreContext context) throws AMQException { - _messsageStore.commitTran(context); + _transactionLog.commitTran(context); } public void rollback(StoreContext context) throws AMQException { - _messsageStore.abortTran(context); + _transactionLog.abortTran(context); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index de4c8ac1ff..1497b4adb8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -30,6 +30,8 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; @@ -45,10 +47,8 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.Accessable; -import org.apache.qpid.server.security.access.plugins.SimpleXML; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.store.MessageStore; public class VirtualHost implements Accessable { @@ -65,7 +65,9 @@ public class VirtualHost implements Accessable private ExchangeFactory _exchangeFactory; - private MessageStore _messageStore; + private TransactionLog _transactionLog; + + private RoutingTable _routingTable; protected VirtualHostMBean _virtualHostMBean; @@ -96,6 +98,11 @@ public class VirtualHost implements Accessable return _connectionRegistry; } + public RoutingTable getRoutingTable() + { + return _routingTable; + } + /** * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any * implementaion of an Exchange MBean should extend this class. @@ -128,12 +135,12 @@ public class VirtualHost implements Accessable /** * Used for testing only * @param name - * @param store + * @param transactionLog * @throws Exception */ - public VirtualHost(String name, MessageStore store) throws Exception + public VirtualHost(String name, TransactionLog transactionLog) throws Exception { - this(name, new PropertiesConfiguration(), store); + this(name, new PropertiesConfiguration(), transactionLog); } /** @@ -147,7 +154,7 @@ public class VirtualHost implements Accessable this(name, hostConfig, null); } - public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception + public VirtualHost(String name, Configuration hostConfig, TransactionLog transactionLog) throws Exception { if (name == null || name.length() == 0) { @@ -166,19 +173,26 @@ public class VirtualHost implements Accessable _exchangeFactory.initialise(hostConfig); _exchangeRegistry = new DefaultExchangeRegistry(this); - if (store != null) + if (transactionLog != null) { - _messageStore = store; + _transactionLog = transactionLog; + if (_transactionLog instanceof RoutingTable) + { + _routingTable = (RoutingTable) _transactionLog; + } } else { if (hostConfig == null) { - throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); + throw new IllegalAccessException("HostConfig and TransactionLog cannot be null"); } - initialiseMessageStore(hostConfig); + initialiseTransactionLog(hostConfig); + initialiseRoutingTable(hostConfig); } + + _exchangeRegistry.initialise(); _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); @@ -224,23 +238,52 @@ public class VirtualHost implements Accessable period); } } - - private void initialiseMessageStore(Configuration config) throws Exception + + //todo we need to move from store.class to transactionlog.class + private void initialiseTransactionLog(Configuration config) throws Exception { - String messageStoreClass = config.getString("store.class"); + String transactionLogClass = config.getString("store.class"); - Class clazz = Class.forName(messageStoreClass); + Class clazz = Class.forName(transactionLogClass); Object o = clazz.newInstance(); - if (!(o instanceof MessageStore)) + if (!(o instanceof TransactionLog)) { - throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz + " does not."); } - _messageStore = (MessageStore) o; - _messageStore.configure(this, "store", config); + _transactionLog = (TransactionLog) o; + _transactionLog.configure(this, "store", config); } + //todo we need to move from store.class to transactionlog.class + private void initialiseRoutingTable(Configuration config) throws Exception + { + String transactionLogClass = config.getString("routingtable.class"); + + if (transactionLogClass != null) + { + Class clazz = Class.forName(transactionLogClass); + Object o = clazz.newInstance(); + + if (!(o instanceof RoutingTable)) + { + throw new ClassCastException("RoutingTable class must implement " + RoutingTable.class + ". Class " + clazz + + " does not."); + } + _routingTable = (RoutingTable) o; + _routingTable.configure(this, "routingtable", config); + } + else + { + if (_transactionLog instanceof RoutingTable) + { + _routingTable = (RoutingTable)_transactionLog; + } + } + } + + public <T> T getConfiguredObject(Class<T> instanceType, Configuration config) { @@ -284,9 +327,9 @@ public class VirtualHost implements Accessable throw new UnsupportedOperationException(); } - public MessageStore getMessageStore() + public TransactionLog getTransactionLog() { - return _messageStore; + return _transactionLog; } public AuthenticationManager getAuthenticationManager() @@ -320,10 +363,10 @@ public class VirtualHost implements Accessable _houseKeepingTimer.cancel(); } - //Close MessageStore - if (_messageStore != null) + //Close TransactionLog + if (_transactionLog != null) { - _messageStore.close(); + _transactionLog.close(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java index faa7b85d58..a6e060e793 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java @@ -302,7 +302,7 @@ public class MessageStoreTool boolean warning = false; for (VirtualHost vhost : vhosts) { - if (vhost.getMessageStore() instanceof MemoryMessageStore) + if (vhost.getTransactionLog() instanceof MemoryMessageStore) { _console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. " + "Changes will not persist."); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 5c54c0b57f..40b08a2e39 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -31,18 +31,17 @@ import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.FailedDequeueException; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.MessageCleanupException; -import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.MockProtocolSession; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; @@ -64,11 +63,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected final Set<TestQueue> queues = new HashSet<TestQueue>(); /** Not used in this test, just there to stub out the routing calls */ - private MessageStore _store = new MemoryMessageStore(); - - private StoreContext _storeContext = new StoreContext(); - - private MessageFactory _messageFactory = MessageFactory.getInstance(); + private TransactionLog _transactionLog = new MemoryMessageStore(); private int count; @@ -107,7 +102,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void route(Message m) throws AMQException { exchange.route(m.getIncomingMessage()); - m.getIncomingMessage().routingComplete(_store); + m.getIncomingMessage().routingComplete(_transactionLog); if (m.getIncomingMessage().allContentReceived()) { m.getIncomingMessage().deliverToQueues(); @@ -382,9 +377,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase static class Message { - private static MessageStore _messageStore = new SkeletonMessageStore(); + private static TransactionLog _transactionLog = new SkeletonMessageStore(); - private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(), + private static TransactionalContext _txnContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, new LinkedList<RequiredDeliveryException>() ); @@ -395,7 +390,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase MessagePublishInfo mpi = getPublishRequest(id); - IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_messageStore), _messageStore); + IncomingMessage incomming = new IncomingMessage(mpi, _txnContext, new MockProtocolSession(_transactionLog), _transactionLog); try { diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index 396b8c5128..f8544a33bd 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -27,10 +27,10 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -46,17 +46,16 @@ public class DestWildExchangeTest extends TestCase TopicExchange _exchange; VirtualHost _vhost; - MessageStore _store; + TransactionLog _tranasctionLog; StoreContext _context; InternalTestProtocolSession _protocolSession; - public void setUp() throws AMQException { _exchange = new TopicExchange(); _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); - _store = new MemoryMessageStore(); + _tranasctionLog = new MemoryMessageStore(); _context = new StoreContext(); _protocolSession = new InternalTestProtocolSession(); } @@ -75,7 +74,7 @@ public class DestWildExchangeTest extends TestCase MessagePublishInfo info = new MessagePublishInfoImpl(null, false, false, new AMQShortString("a.b")); - IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _store); + IncomingMessage message = new IncomingMessage(info, null, _protocolSession, _tranasctionLog); _exchange.route(message); @@ -497,7 +496,7 @@ public class DestWildExchangeTest extends TestCase throws AMQException { _exchange.route(message); - message.routingComplete(_store); + message.routingComplete(_tranasctionLog); message.deliverToQueues(); } @@ -547,11 +546,11 @@ public class DestWildExchangeTest extends TestCase { MessagePublishInfo info = new MessagePublishInfoImpl(null, false, true, new AMQShortString(s)); - TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null, + TransactionalContext trancontext = new NonTransactionalContext(_tranasctionLog, _context, null, new LinkedList<RequiredDeliveryException>() ); - IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _store); + IncomingMessage message = new IncomingMessage(info, trancontext,_protocolSession, _tranasctionLog); message.setContentHeaderBody( new ContentHeaderBody()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index d5db87350b..f6c307757b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -28,11 +28,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import javax.management.JMException; @@ -45,7 +45,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase /** Used for debugging. */ private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class); - private MessageStore _messageStore = new SkeletonMessageStore(); + private TransactionLog _transactionLog = new SkeletonMessageStore(); private AMQMinaProtocolSession _protocolSession; private AMQChannel _channel; private AMQProtocolSessionMBean _mbean; @@ -60,7 +60,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase new AMQShortString("test"), true, _protocolSession.getVirtualHost(), null); - AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore); + AMQChannel channel = new AMQChannel(_protocolSession,2, _transactionLog); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); channelCount = _mbean.channels().size(); @@ -71,7 +71,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L); // check APIs - AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore); + AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _transactionLog); channel3.setLocalTransactional(); _protocolSession.addChannel(channel3); _mbean.rollbackTransactions(2); @@ -91,14 +91,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase } // check if closing of session works - _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore)); + _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _transactionLog)); _mbean.closeConnection(); try { channelCount = _mbean.channels().size(); assertTrue(channelCount == 0); // session is now closed so adding another channel should throw an exception - _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore)); + _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _transactionLog)); fail(); } catch (AMQException ex) @@ -117,7 +117,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase new AMQMinaProtocolSession(new TestIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true), null); _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test")); - _channel = new AMQChannel(_protocolSession, 1, _messageStore); + _channel = new AMQChannel(_protocolSession, 1, _transactionLog); _protocolSession.addChannel(_channel); _mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index be40535df9..1bc50db1d5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -32,6 +31,7 @@ import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.protocol.AMQMinaProtocolSession; @@ -61,9 +61,9 @@ public class AMQQueueAlertTest extends TestCase private AMQQueueMBean _queueMBean; private VirtualHost _virtualHost; private AMQMinaProtocolSession _protocolSession; - private MessageStore _messageStore = new MemoryMessageStore(); + private TransactionLog _transactionLog = new MemoryMessageStore(); private StoreContext _storeContext = new StoreContext(); - private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + private TransactionalContext _transactionalContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); @@ -186,7 +186,7 @@ public class AMQQueueAlertTest extends TestCase public void testQueueDepthAlertWithSubscribers() throws Exception { _protocolSession = new InternalTestProtocolSession(); - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); + AMQChannel channel = new AMQChannel(_protocolSession, 2, _transactionLog); _protocolSession.addChannel(channel); // Create queue @@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends TestCase ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = size; // in bytes - IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore); + IncomingMessage message = new IncomingMessage(publish, _transactionalContext, _protocolSession, _transactionLog); message.setContentHeaderBody(contentHeaderBody); return message; @@ -308,7 +308,7 @@ public class AMQQueueAlertTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); messages[i].enqueue(qs); - messages[i].routingComplete(_messageStore); + messages[i].routingComplete(_transactionLog); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 97c76c8e9d..daa8e4beb7 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -31,6 +31,7 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactory; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; @@ -41,7 +42,6 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -60,7 +60,7 @@ public class AMQQueueMBeanTest extends TestCase private static long MESSAGE_SIZE = 1000; private AMQQueue _queue; private AMQQueueMBean _queueMBean; - private MessageStore _messageStore; + private TransactionLog _transactionLog; private StoreContext _storeContext = new StoreContext(); private TransactionalContext _transactionalContext; private VirtualHost _virtualHost; @@ -113,7 +113,7 @@ public class AMQQueueMBeanTest extends TestCase private void verifyBrokerState() { - TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore()); + TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getTransactionLog()); // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); @@ -130,7 +130,7 @@ public class AMQQueueMBeanTest extends TestCase InternalTestProtocolSession protocolSession = new InternalTestProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore); + AMQChannel channel = new AMQChannel(protocolSession, 1, _transactionLog); protocolSession.addChannel(channel); Subscription subscription = @@ -221,7 +221,7 @@ public class AMQQueueMBeanTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore); + msg.routingComplete(_transactionLog); long id = msg.getMessageId(); @@ -266,7 +266,7 @@ public class AMQQueueMBeanTest extends TestCase contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); - IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _messageStore); + IncomingMessage msg = new IncomingMessage(publish, _transactionalContext, _protocolSession, _transactionLog); msg.setContentHeaderBody(contentHeaderBody); return msg; @@ -278,9 +278,9 @@ public class AMQQueueMBeanTest extends TestCase super.setUp(); IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(1); _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = _virtualHost.getMessageStore(); + _transactionLog = _virtualHost.getTransactionLog(); - _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + _transactionalContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); @@ -307,7 +307,7 @@ public class AMQQueueMBeanTest extends TestCase currentMessage.enqueue(qs); // route header - currentMessage.routingComplete(_messageStore); + currentMessage.routingComplete(_transactionLog); // Add the body so we have somthing to test later currentMessage.addContentBodyFrame( diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java index 3633481012..2a51f42e4e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockPersistentAMQMessage.java @@ -21,13 +21,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.transactionlog.TransactionLog; public class MockPersistentAMQMessage extends PersistentAMQMessage { - public MockPersistentAMQMessage(long messageId, MessageStore store) + public MockPersistentAMQMessage(long messageId, TransactionLog transactionLog) throws AMQException { - super(messageId, store); + super(messageId, transactionLog); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index 99c88fac3e..0cc33bf102 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -24,11 +24,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.transport.Sender; import javax.security.sasl.SaslServer; @@ -41,13 +41,11 @@ import java.security.Principal; */ public class MockProtocolSession implements AMQProtocolSession { - private MessageStore _messageStore; private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); - public MockProtocolSession(MessageStore messageStore) + public MockProtocolSession(TransactionLog transactionLog) { - _messageStore = messageStore; } public void dataBlockReceived(AMQDataBlock message) throws Exception diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java index 1e47f764df..bcbd83cde3 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java @@ -73,6 +73,7 @@ public class PrincipalPermissionsTest extends TestCase } catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 36f640a325..7722eae116 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -39,6 +38,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ContentHeaderBody; @@ -151,7 +151,7 @@ public class MessageStoreTest extends TestCase //Load the Virtualhost with the required MessageStore reload(configuration); - MessageStore messageStore = _virtualHost.getMessageStore(); + TransactionLog transactionLog = _virtualHost.getTransactionLog(); createAllQueues(); createAllTopicQueues(); @@ -190,9 +190,9 @@ public class MessageStoreTest extends TestCase assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size()); - if (!messageStore.isPersistent()) + if (!transactionLog.isPersistent()) { - _logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence."); + _logger.warn("Unable to test Persistent capabilities of messages store(" + transactionLog.getClass() + ") as it is not capable of peristence."); return; } @@ -348,10 +348,10 @@ public class MessageStoreTest extends TestCase try { currentMessage = new IncomingMessage(messageInfo, - new NonTransactionalContext(_virtualHost.getMessageStore(), + new NonTransactionalContext(_virtualHost.getTransactionLog(), new StoreContext(), null, null), new InternalTestProtocolSession(), - _virtualHost.getMessageStore()); + _virtualHost.getTransactionLog()); } catch (AMQException e) { @@ -388,7 +388,7 @@ public class MessageStoreTest extends TestCase try { - currentMessage.routingComplete(_virtualHost.getMessageStore()); + currentMessage.routingComplete(_virtualHost.getTransactionLog()); } catch (AMQException e) { @@ -486,7 +486,7 @@ public class MessageStoreTest extends TestCase if (queue.isDurable() && !queue.isAutoDelete()) { - _virtualHost.getMessageStore().createQueue(queue, queueArguments); + _virtualHost.getRoutingTable().createQueue(queue, queueArguments); } } catch (AMQException e) diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index f08a15a8a7..a5b65b527c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -29,6 +29,8 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.routing.RoutingTable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -37,7 +39,7 @@ import java.util.concurrent.atomic.AtomicLong; * A message store that does nothing. Designed to be used in tests that do not want to use any message store * functionality. */ -public class SkeletonMessageStore implements MessageStore +public class SkeletonMessageStore implements TransactionLog , RoutingTable { private final AtomicLong _messageId = new AtomicLong(1); diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java index 84d3d313d1..ca6644d141 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.txn; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.LinkedList; import java.util.NoSuchElementException; @@ -283,13 +283,13 @@ public class TxnBufferTest extends TestCase class TxnTester extends NullOp { - private final MessageStore store; + private final TransactionLog store; private final StoreContext context = new StoreContext(); - TxnTester(MessageStore store) + TxnTester(TransactionLog transactionLog) { - this.store = store; + this.store = transactionLog; } public void prepare() throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 9e24f13127..d150faf94a 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -30,8 +30,8 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.framing.AMQShortString; @@ -47,7 +47,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; public class InternalBrokerBaseCase extends TestCase { protected IApplicationRegistry _registry; - protected MessageStore _messageStore; + protected TransactionLog _transactionLog; protected MockChannel _channel; protected InternalTestProtocolSession _session; protected VirtualHost _virtualHost; @@ -62,7 +62,7 @@ public class InternalBrokerBaseCase extends TestCase ApplicationRegistry.initialise(_registry); _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = _virtualHost.getMessageStore(); + _transactionLog = _virtualHost.getTransactionLog(); QUEUE_NAME = new AMQShortString("test"); _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"), @@ -78,7 +78,7 @@ public class InternalBrokerBaseCase extends TestCase _session.setVirtualHost(_virtualHost); - _channel = new MockChannel(_session, 1, _messageStore); + _channel = new MockChannel(_session, 1, _transactionLog); _session.addChannel(_channel); } @@ -91,7 +91,7 @@ public class InternalBrokerBaseCase extends TestCase protected void checkStoreContents(int messageCount) { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size()); + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _transactionLog).getMessageMetaDataMap().size()); //The above publish message is sufficiently small not to fit in the header so no Body is required. //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index b6d42e6068..2605ed0d11 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -27,14 +27,13 @@ import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.access.ACLManager; -import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.Collection; import java.util.HashMap; @@ -49,11 +48,10 @@ public class TestApplicationRegistry extends ApplicationRegistry private ExchangeFactory _exchangeFactory; - private MessageStore _messageStore; + private TransactionLog _transactionLog; private VirtualHost _vHost; - public TestApplicationRegistry() { super(new MapConfiguration(new HashMap())); @@ -73,11 +71,11 @@ public class TestApplicationRegistry extends ApplicationRegistry _managedObjectRegistry = new NoopManagedObjectRegistry(); - _messageStore = new TestableMemoryMessageStore(); + _transactionLog = new TestableMemoryMessageStore(); _virtualHostRegistry = new VirtualHostRegistry(); - _vHost = new VirtualHost("test", _messageStore); + _vHost = new VirtualHost("test", _transactionLog); _virtualHostRegistry.registerVirtualHost(_vHost); @@ -114,9 +112,9 @@ public class TestApplicationRegistry extends ApplicationRegistry _accessManager = newManager; } - public MessageStore getMessageStore() + public TransactionLog getTransactionLog() { - return _messageStore; + return _transactionLog; } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java new file mode 100644 index 0000000000..ba19fd5d5e --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualhostInitRoutingTableFromTransactionLogTest.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import junit.framework.TestCase; +import org.apache.commons.configuration.PropertiesConfiguration; + +public class VirtualhostInitRoutingTableFromTransactionLogTest extends TestCase +{ + public void test() + { + PropertiesConfiguration env = new PropertiesConfiguration(); + + + env.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore"); + + VirtualHost _virtualHost = null; + try + { + _virtualHost = new VirtualHost("test", env); + + assertNotNull(_virtualHost.getTransactionLog()); + assertNotNull(_virtualHost.getRoutingTable()); + assertEquals(_virtualHost.getTransactionLog(),_virtualHost.getRoutingTable()); + } + catch (Exception e) + { + fail(e.getMessage()); + } + + + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java index 447d09429d..77767463ea 100644 --- a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java +++ b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java @@ -21,18 +21,18 @@ package org.apache.qpid.util; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; public class MockChannel extends AMQChannel { - public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + public MockChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog) throws AMQException { - super(session, channelId, messageStore); + super(session, channelId, transactionLog); } public Subscription getSubscription(AMQShortString subscription) diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index dd71b2d0da..256491194d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -30,18 +30,21 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.routing.RoutingTable; import java.util.HashMap; import java.util.Iterator; -public class SlowMessageStore implements MessageStore +public class SlowMessageStore implements TransactionLog, RoutingTable { private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); private static final String DELAYS = "delays"; private HashMap<String, Long> _preDelays = new HashMap<String, Long>(); private HashMap<String, Long> _postDelays = new HashMap<String, Long>(); private long _defaultDelay = 0L; - private MessageStore _realStore = new MemoryMessageStore(); + private TransactionLog _realTransactionLog = new MemoryMessageStore(); + private RoutingTable _realRoutingTable = (RoutingTable)_realTransactionLog; private static final String PRE = "pre"; private static final String POST = "post"; private String DEFAULT_DELAY = "default"; @@ -53,30 +56,30 @@ public class SlowMessageStore implements MessageStore configureDelays(delays); - String messageStoreClass = config.getString(base + ".store.class"); + String transactionLogClass = config.getString(base + ".store.class"); if (delays.containsKey(DEFAULT_DELAY)) { _defaultDelay = delays.getLong(DEFAULT_DELAY); } - if (messageStoreClass != null) + if (transactionLogClass != null) { - Class clazz = Class.forName(messageStoreClass); + Class clazz = Class.forName(transactionLogClass); Object o = clazz.newInstance(); - if (!(o instanceof MessageStore)) + if (!(o instanceof TransactionLog)) { - throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz + " does not."); } - _realStore = (MessageStore) o; - _realStore.configure(virtualHost, base + ".store", config); + _realTransactionLog = (TransactionLog) o; + _realTransactionLog.configure(virtualHost, base , config); } else { - _realStore.configure(virtualHost, base + ".store", config); + _realTransactionLog.configure(virtualHost, base , config); } } @@ -150,42 +153,42 @@ public class SlowMessageStore implements MessageStore public void close() throws Exception { doPreDelay("close"); - _realStore.close(); + _realTransactionLog.close(); doPostDelay("close"); } public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException { doPreDelay("removeMessage"); - _realStore.removeMessage(storeContext, messageId); + _realTransactionLog.removeMessage(storeContext, messageId); doPostDelay("removeMessage"); } public void createExchange(Exchange exchange) throws AMQException { doPreDelay("createExchange"); - _realStore.createExchange(exchange); + _realRoutingTable.createExchange(exchange); doPostDelay("createExchange"); } public void removeExchange(Exchange exchange) throws AMQException { doPreDelay("removeExchange"); - _realStore.removeExchange(exchange); + _realRoutingTable.removeExchange(exchange); doPostDelay("removeExchange"); } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { doPreDelay("bindQueue"); - _realStore.bindQueue(exchange, routingKey, queue, args); + _realRoutingTable.bindQueue(exchange, routingKey, queue, args); doPostDelay("bindQueue"); } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { doPreDelay("unbindQueue"); - _realStore.unbindQueue(exchange, routingKey, queue, args); + _realRoutingTable.unbindQueue(exchange, routingKey, queue, args); doPostDelay("unbindQueue"); } @@ -197,56 +200,56 @@ public class SlowMessageStore implements MessageStore public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException { doPreDelay("createQueue"); - _realStore.createQueue(queue, arguments); + _realRoutingTable.createQueue(queue, arguments); doPostDelay("createQueue"); } public void removeQueue(AMQQueue queue) throws AMQException { doPreDelay("removeQueue"); - _realStore.removeQueue(queue); + _realRoutingTable.removeQueue(queue); doPostDelay("removeQueue"); } public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException { doPreDelay("enqueueMessage"); - _realStore.enqueueMessage(context, queue, messageId); + _realTransactionLog.enqueueMessage(context, queue, messageId); doPostDelay("enqueueMessage"); } public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException { doPreDelay("dequeueMessage"); - _realStore.dequeueMessage(context, queue, messageId); + _realTransactionLog.dequeueMessage(context, queue, messageId); doPostDelay("dequeueMessage"); } public void beginTran(StoreContext context) throws AMQException { doPreDelay("beginTran"); - _realStore.beginTran(context); + _realTransactionLog.beginTran(context); doPostDelay("beginTran"); } public void commitTran(StoreContext context) throws AMQException { doPreDelay("commitTran"); - _realStore.commitTran(context); + _realTransactionLog.commitTran(context); doPostDelay("commitTran"); } public void abortTran(StoreContext context) throws AMQException { doPreDelay("abortTran"); - _realStore.abortTran(context); + _realTransactionLog.abortTran(context); doPostDelay("abortTran"); } public boolean inTran(StoreContext context) { doPreDelay("inTran"); - boolean b = _realStore.inTran(context); + boolean b = _realTransactionLog.inTran(context); doPostDelay("inTran"); return b; } @@ -254,21 +257,21 @@ public class SlowMessageStore implements MessageStore public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { doPreDelay("storeContentBodyChunk"); - _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); + _realTransactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); doPostDelay("storeContentBodyChunk"); } public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { doPreDelay("storeMessageMetaData"); - _realStore.storeMessageMetaData(context, messageId, messageMetaData); + _realTransactionLog.storeMessageMetaData(context, messageId, messageMetaData); doPostDelay("storeMessageMetaData"); } public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { doPreDelay("getMessageMetaData"); - MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId); + MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId); doPostDelay("getMessageMetaData"); return mmd; } @@ -276,14 +279,14 @@ public class SlowMessageStore implements MessageStore public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { doPreDelay("getContentBodyChunk"); - ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index); + ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index); doPostDelay("getContentBodyChunk"); return c; } public boolean isPersistent() { - return _realStore.isPersistent(); + return _realTransactionLog.isPersistent(); } } |