summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java297
1 files changed, 284 insertions, 13 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
index a7e9ef2ab6..99785c48a9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
@@ -20,17 +20,36 @@
*/
package org.apache.qpid.server.store;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
-abstract public class AbstractMemoryMessageStore extends NullMessageStore
+abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore
{
- private final AtomicLong _messageId = new AtomicLong(1);
-
- private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
+ private final class MemoryMessageStoreTransaction implements Transaction
{
+ private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
+ private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
+
+ private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
+ private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
+
@Override
public StoreFuture commitTranAsync()
{
@@ -40,50 +59,145 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localEnqueueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
}
@Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localDequeueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
}
@Override
public void commitTran()
{
+ commitTransactionInternal(this);
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
}
@Override
public void abortTran()
{
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
+ _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
+ _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
}
- };
+ }
+
+ private final AtomicLong _messageId = new AtomicLong(1);
- private final EventManager _eventManager = new EventManager();
+ private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
+ protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
+ private Object _transactionLock = new Object();
+ private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
+ private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+
+ @SuppressWarnings("unchecked")
@Override
- public StoredMessage addMessage(StorableMessageMetaData metaData)
+ public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData)
{
- final long id = _messageId.getAndIncrement();
- StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+ long id = _messageId.getAndIncrement();
+
+ if(metaData.isPersistent())
+ {
+ return new StoredMemoryMessage(id, metaData)
+ {
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ _messages.putIfAbsent(getMessageNumber(), this) ;
+ return super.flushToStore();
+ }
+
+ @Override
+ public void remove()
+ {
+ _messages.remove(getMessageNumber());
+ super.remove();
+ }
+
+ };
+ }
+ else
+ {
+ return new StoredMemoryMessage(id, metaData);
+ }
+ }
+
+ private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
+ {
+ synchronized (_transactionLock )
+ {
+ for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds);
+ }
+ messageIds.addAll(loacalEnqueuedEntry.getValue());
+ }
+
+ for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
+ if (messageIds != null)
+ {
+ messageIds.removeAll(loacalDequeueEntry.getValue());
+ if (messageIds.isEmpty())
+ {
+ _messageInstances.remove(loacalDequeueEntry.getKey());
+ }
+ }
+ }
+
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
+ {
+ _distributedTransactions.put(entry.getKey(), entry.getValue());
+ }
+
+ for (Xid removed : transaction._localDistributedTransactionsRemoves)
+ {
+ _distributedTransactions.remove(removed);
+ }
+
+ }
+
- return message;
}
@Override
public Transaction newTransaction()
{
- return IN_MEMORY_TRANSACTION;
+ return new MemoryMessageStoreTransaction();
}
@Override
@@ -95,7 +209,164 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
@Override
public void addEventListener(EventListener eventListener, Event... events)
{
- _eventManager.addEventListener(eventListener, events);
}
+ @Override
+ public void create(ConfiguredObjectRecord record)
+ {
+ if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
+ {
+ throw new StoreException("Record with id " + record.getId() + " is already present");
+ }
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
+ {
+ for (ConfiguredObjectRecord record : records)
+ {
+ ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
+ if (previousValue == null && !createIfNecessary)
+ {
+ throw new StoreException("Record with id " + record.getId() + " does not exist");
+ }
+ }
+ }
+
+ @Override
+ public UUID[] remove(final ConfiguredObjectRecord... objects)
+ {
+ List<UUID> removed = new ArrayList<UUID>();
+ for (ConfiguredObjectRecord record : objects)
+ {
+ if (_configuredObjectRecords.remove(record.getId()) != null)
+ {
+ removed.add(record.getId());
+ }
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ @Override
+ public void closeConfigurationStore()
+ {
+ _configuredObjectRecords.clear();
+ }
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+ {
+ handler.begin(VirtualHost.CURRENT_CONFIG_VERSION);
+ for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
+ {
+ if (!handler.handle(record))
+ {
+ break;
+ }
+ }
+ handler.end();
+ }
+
+ @Override
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
+ {
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ _messages.clear();
+ synchronized (_transactionLock)
+ {
+ _messageInstances.clear();
+ _distributedTransactions.clear();
+ }
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return null;
+ }
+
+ @Override
+ public void onDelete()
+ {
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ for (StoredMemoryMessage message : _messages.values())
+ {
+ if(!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+ {
+ UUID resourceId = enqueuedEntry.getKey();
+ for (Long messageId : enqueuedEntry.getValue())
+ {
+ if (!handler.handle(resourceId, messageId))
+ {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+ {
+ Xid xid = entry.getKey();
+ DistributedTransactionRecords records = entry.getValue();
+ if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues()))
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ private static final class DistributedTransactionRecords
+ {
+ private Record[] _enqueues;
+ private Record[] _dequeues;
+
+ public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues)
+ {
+ super();
+ _enqueues = enqueues;
+ _dequeues = dequeues;
+ }
+
+ public Record[] getEnqueues()
+ {
+ return _enqueues;
+ }
+
+ public Record[] getDequeues()
+ {
+ return _dequeues;
+ }
+ }
}