diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java | 166 |
1 files changed, 65 insertions, 101 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 587c85fc12..e3e9432e6b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,16 +20,20 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.ConfigStoreMessages; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.commons.configuration.Configuration; import java.util.ArrayList; import java.util.Collections; @@ -38,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.nio.ByteBuffer; /** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore @@ -48,59 +53,74 @@ public class MemoryMessageStore implements MessageStore private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity"; - protected ConcurrentMap<Long, MessageMetaData> _metaDataMap; - - protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap; private final AtomicLong _messageId = new AtomicLong(1); private AtomicBoolean _closed = new AtomicBoolean(false); + private LogSubject _logSubject; - public void configure() + private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() { - _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables"); - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY); - } + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException + { + } + + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException + { + } + + public void commitTran() throws AMQException + { + } + + public StoreFuture commitTranAsync() throws AMQException + { + return IMMEDIATE_FUTURE; + } + + public void abortTran() throws AMQException + { + } - public void configure(String base, Configuration config) + }; + + public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception { - int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); - _log.info("Using capacity " + hashtableCapacity + " for hash tables"); - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity); + _logSubject = logSubject; + CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName())); + + } - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + Configuration config, + LogSubject logSubject) throws Exception { - configure(base, config); + if(_logSubject == null) + { + _logSubject = logSubject; + } + int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); + _log.info("Using capacity " + hashtableCapacity + " for hash tables"); + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_CREATED(this.getClass().getName())); } public void close() throws Exception { _closed.getAndSet(true); - if (_metaDataMap != null) - { - _metaDataMap.clear(); - _metaDataMap = null; - } - if (_contentBodyMap != null) - { - _contentBodyMap.clear(); - _contentBodyMap = null; - } + CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_CLOSED()); + } - public void removeMessage(StoreContext context, Long messageId) throws AMQException + public StoredMessage addMessage(StorableMessageMetaData metaData) { - checkNotClosed(); - if (_log.isDebugEnabled()) - { - _log.debug("Removing message with id " + messageId); - } - _metaDataMap.remove(messageId); - _contentBodyMap.remove(messageId); + final long id = _messageId.getAndIncrement(); + StoredMemoryMessage message = new StoredMemoryMessage(id, metaData); + + return message; } + public void createExchange(Exchange exchange) throws AMQException { @@ -137,35 +157,19 @@ public class MemoryMessageStore implements MessageStore // Not required to do anything } - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public void configureTransactionLog(String name, + TransactionLogRecoveryHandler recoveryHandler, + Configuration storeConfiguration, + LogSubject logSubject) throws Exception { - // Not required to do anything + //To change body of implemented methods use File | Settings | File Templates. } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + public Transaction newTransaction() { - // Not required to do anything + return IN_MEMORY_TRANSACTION; } - public void beginTran(StoreContext context) throws AMQException - { - // Not required to do anything - } - - public void commitTran(StoreContext context) throws AMQException - { - // Not required to do anything - } - - public void abortTran(StoreContext context) throws AMQException - { - // Not required to do anything - } - - public boolean inTran(StoreContext context) - { - return false; - } public List<AMQQueue> createQueues() throws AMQException { @@ -177,48 +181,6 @@ public class MemoryMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) - throws AMQException - { - checkNotClosed(); - List<ContentChunk> bodyList = _contentBodyMap.get(messageId); - - if (bodyList == null && lastContentBody) - { - _contentBodyMap.put(messageId, Collections.singletonList(contentBody)); - } - else - { - if (bodyList == null) - { - bodyList = new ArrayList<ContentChunk>(); - _contentBodyMap.put(messageId, bodyList); - } - - bodyList.add(index, contentBody); - } - } - - public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) - throws AMQException - { - checkNotClosed(); - _metaDataMap.put(messageId, messageMetaData); - } - - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException - { - checkNotClosed(); - return _metaDataMap.get(messageId); - } - - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException - { - checkNotClosed(); - List<ContentChunk> bodyList = _contentBodyMap.get(messageId); - return bodyList.get(index); - } - public boolean isPersistent() { return false; @@ -231,4 +193,6 @@ public class MemoryMessageStore implements MessageStore throw new MessageStoreClosedException(); } } + + } |