From 1e568c49bae7bfc03fa89e34dac8ac97ff2a54bb Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 14 Apr 2009 15:54:16 +0000 Subject: QPID-1807 : Add 0.5-fix broker and update SlowMessageStore to use MessageStores rather than TransactionLogs git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@764850 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/store/SlowMessageStore.java | 132 ++++++++++----------- 1 file changed, 66 insertions(+), 66 deletions(-) (limited to 'java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java') diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 662f04b3c9..7d8c81f4d5 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -31,62 +31,54 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.transactionlog.TransactionLog; -import org.apache.qpid.server.routing.RoutingTable; import java.util.HashMap; import java.util.Iterator; -import java.util.ArrayList; -public class SlowMessageStore implements TransactionLog, RoutingTable +public class SlowMessageStore implements MessageStore { private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); private static final String DELAYS = "delays"; private HashMap _preDelays = new HashMap(); private HashMap _postDelays = new HashMap(); private long _defaultDelay = 0L; - private TransactionLog _realTransactionLog = new MemoryMessageStore(); - private RoutingTable _realRoutingTable = (RoutingTable)_realTransactionLog; + private MessageStore _realStore = new MemoryMessageStore(); private static final String PRE = "pre"; private static final String POST = "post"; private String DEFAULT_DELAY = "default"; - public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { - _logger.warn("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName()); + _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName()); Configuration delays = config.getStoreConfiguration().subset(DELAYS); configureDelays(delays); - String transactionLogClass = config.getTransactionLogClass(); + String messageStoreClass = config.getStoreConfiguration().getString("realStore"); if (delays.containsKey(DEFAULT_DELAY)) { _defaultDelay = delays.getLong(DEFAULT_DELAY); - _logger.warn("Delay is:" + _defaultDelay); } - if (transactionLogClass != null) + if (messageStoreClass != null) { - Class clazz = Class.forName(transactionLogClass); - if (clazz != this.getClass()) - { + Class clazz = Class.forName(messageStoreClass); - Object o = clazz.newInstance(); + Object o = clazz.newInstance(); - if (!(o instanceof TransactionLog)) - { - throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz + - " does not."); - } - _realTransactionLog = (TransactionLog) o; + if (!(o instanceof MessageStore)) + { + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + " does not."); } + _realStore = (MessageStore) o; + _realStore.configure(virtualHost, base + ".store", config); + } + else + { + _realStore.configure(virtualHost, base + ".store", config); } - - // The call to configure may return a new transaction log - _realTransactionLog = (TransactionLog) _realTransactionLog.configure(virtualHost, base , config); - - return this; } private void configureDelays(Configuration config) @@ -159,35 +151,42 @@ public class SlowMessageStore implements TransactionLog, RoutingTable public void close() throws Exception { doPreDelay("close"); - _realTransactionLog.close(); + _realStore.close(); doPostDelay("close"); } + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException + { + doPreDelay("removeMessage"); + _realStore.removeMessage(storeContext, messageId); + doPostDelay("removeMessage"); + } + public void createExchange(Exchange exchange) throws AMQException { doPreDelay("createExchange"); - _realRoutingTable.createExchange(exchange); + _realStore.createExchange(exchange); doPostDelay("createExchange"); } public void removeExchange(Exchange exchange) throws AMQException { doPreDelay("removeExchange"); - _realRoutingTable.removeExchange(exchange); + _realStore.removeExchange(exchange); doPostDelay("removeExchange"); } public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { doPreDelay("bindQueue"); - _realRoutingTable.bindQueue(exchange, routingKey, queue, args); + _realStore.bindQueue(exchange, routingKey, queue, args); doPostDelay("bindQueue"); } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { doPreDelay("unbindQueue"); - _realRoutingTable.unbindQueue(exchange, routingKey, queue, args); + _realStore.unbindQueue(exchange, routingKey, queue, args); doPostDelay("unbindQueue"); } @@ -199,100 +198,101 @@ public class SlowMessageStore implements TransactionLog, RoutingTable public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException { doPreDelay("createQueue"); - _realRoutingTable.createQueue(queue, arguments); + _realStore.createQueue(queue, arguments); doPostDelay("createQueue"); } public void removeQueue(AMQQueue queue) throws AMQException { doPreDelay("removeQueue"); - _realRoutingTable.removeQueue(queue); + _realStore.removeQueue(queue); doPostDelay("removeQueue"); } - public void enqueueMessage(StoreContext context, ArrayList queues, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException { doPreDelay("enqueueMessage"); - _realTransactionLog.enqueueMessage(context, queues, messageId); + _realStore.enqueueMessage(context, queue, messageId); doPostDelay("enqueueMessage"); } public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException { doPreDelay("dequeueMessage"); - _realTransactionLog.dequeueMessage(context, queue, messageId); - doPostDelay("dequeueMessage"); - } - - public void removeMessage(StoreContext context, Long messageId) throws AMQException - { - doPreDelay("dequeueMessage"); - _realTransactionLog.removeMessage(context, messageId); + _realStore.dequeueMessage(context, queue, messageId); doPostDelay("dequeueMessage"); } public void beginTran(StoreContext context) throws AMQException { doPreDelay("beginTran"); - _realTransactionLog.beginTran(context); + _realStore.beginTran(context); doPostDelay("beginTran"); } public void commitTran(StoreContext context) throws AMQException { doPreDelay("commitTran"); - _realTransactionLog.commitTran(context); + _realStore.commitTran(context); doPostDelay("commitTran"); } public void abortTran(StoreContext context) throws AMQException { doPreDelay("abortTran"); - _realTransactionLog.abortTran(context); + _realStore.abortTran(context); doPostDelay("abortTran"); } public boolean inTran(StoreContext context) { doPreDelay("inTran"); - boolean b = _realTransactionLog.inTran(context); + boolean b = _realStore.inTran(context); doPostDelay("inTran"); return b; } + public Long getNewMessageId() + { + doPreDelay("getNewMessageId"); + Long l = _realStore.getNewMessageId(); + doPostDelay("getNewMessageId"); + return l; + } + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { doPreDelay("storeContentBodyChunk"); - _realTransactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); + _realStore.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); doPostDelay("storeContentBodyChunk"); } public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { doPreDelay("storeMessageMetaData"); - _realTransactionLog.storeMessageMetaData(context, messageId, messageMetaData); + _realStore.storeMessageMetaData(context, messageId, messageMetaData); doPostDelay("storeMessageMetaData"); } -// public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException -// { -// doPreDelay("getMessageMetaData"); -// MessageMetaData mmd = _realTransactionLog.getMessageMetaData(context, messageId); -// doPostDelay("getMessageMetaData"); -// return mmd; -// } -// -// public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException -// { -// doPreDelay("getContentBodyChunk"); -// ContentChunk c = _realTransactionLog.getContentBodyChunk(context, messageId, index); -// doPostDelay("getContentBodyChunk"); -// return c; -// } -// + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + { + doPreDelay("getMessageMetaData"); + MessageMetaData mmd = _realStore.getMessageMetaData(context, messageId); + doPostDelay("getMessageMetaData"); + return mmd; + } + + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + { + doPreDelay("getContentBodyChunk"); + ContentChunk c = _realStore.getContentBodyChunk(context, messageId, index); + doPostDelay("getContentBodyChunk"); + return c; + } + public boolean isPersistent() { - return _realTransactionLog.isPersistent(); + return _realStore.isPersistent(); } } -- cgit v1.2.1