summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java166
1 files changed, 65 insertions, 101 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 587c85fc12..e3e9432e6b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,16 +20,20 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.commons.configuration.Configuration;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
public class MemoryMessageStore implements MessageStore
@@ -48,59 +53,74 @@ public class MemoryMessageStore implements MessageStore
private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
- protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
-
- protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
+ private LogSubject _logSubject;
- public void configure()
+ private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
- _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
- }
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ }
+
+ public void commitTran() throws AMQException
+ {
+ }
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return IMMEDIATE_FUTURE;
+ }
+
+ public void abortTran() throws AMQException
+ {
+ }
- public void configure(String base, Configuration config)
+ };
+
+ public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
{
- int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash tables");
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
+ _logSubject = logSubject;
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName()));
+
+
}
- public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
{
- configure(base, config);
+ if(_logSubject == null)
+ {
+ _logSubject = logSubject;
+ }
+ int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
+ _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_CREATED(this.getClass().getName()));
}
public void close() throws Exception
{
_closed.getAndSet(true);
- if (_metaDataMap != null)
- {
- _metaDataMap.clear();
- _metaDataMap = null;
- }
- if (_contentBodyMap != null)
- {
- _contentBodyMap.clear();
- _contentBodyMap = null;
- }
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_CLOSED());
+
}
- public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
{
- checkNotClosed();
- if (_log.isDebugEnabled())
- {
- _log.debug("Removing message with id " + messageId);
- }
- _metaDataMap.remove(messageId);
- _contentBodyMap.remove(messageId);
+ final long id = _messageId.getAndIncrement();
+ StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+
+ return message;
}
+
public void createExchange(Exchange exchange) throws AMQException
{
@@ -137,35 +157,19 @@ public class MemoryMessageStore implements MessageStore
// Not required to do anything
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
{
- // Not required to do anything
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public Transaction newTransaction()
{
- // Not required to do anything
+ return IN_MEMORY_TRANSACTION;
}
- public void beginTran(StoreContext context) throws AMQException
- {
- // Not required to do anything
- }
-
- public void commitTran(StoreContext context) throws AMQException
- {
- // Not required to do anything
- }
-
- public void abortTran(StoreContext context) throws AMQException
- {
- // Not required to do anything
- }
-
- public boolean inTran(StoreContext context)
- {
- return false;
- }
public List<AMQQueue> createQueues() throws AMQException
{
@@ -177,48 +181,6 @@ public class MemoryMessageStore implements MessageStore
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
- throws AMQException
- {
- checkNotClosed();
- List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
-
- if (bodyList == null && lastContentBody)
- {
- _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
- }
- else
- {
- if (bodyList == null)
- {
- bodyList = new ArrayList<ContentChunk>();
- _contentBodyMap.put(messageId, bodyList);
- }
-
- bodyList.add(index, contentBody);
- }
- }
-
- public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
- throws AMQException
- {
- checkNotClosed();
- _metaDataMap.put(messageId, messageMetaData);
- }
-
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
- {
- checkNotClosed();
- return _metaDataMap.get(messageId);
- }
-
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
- {
- checkNotClosed();
- List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
- return bodyList.get(index);
- }
-
public boolean isPersistent()
{
return false;
@@ -231,4 +193,6 @@ public class MemoryMessageStore implements MessageStore
throw new MessageStoreClosedException();
}
}
+
+
}