summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-20 14:50:01 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-20 14:50:01 +0000
commit67d5fb5c48b224efea5134c455719670398dd5eb (patch)
tree9f8e8e77d326bf37069eeae5b15399e92e3f4545
parent0db29114e114d6073494de791785df237cfda475 (diff)
downloadqpid-python-67d5fb5c48b224efea5134c455719670398dd5eb.tar.gz
QPID-1632 - Removal of reference counting and update to tests, TxAckTest was reduced in size as reference counting is now not modified until the transaction completes.
Replaced MessageReferenceCoutingTest with PersistentMessageTest and further tests wil be created when DerbyDBMessageStore is also updated. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@746260 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java80
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java65
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java17
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java77
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java120
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java35
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java)28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java143
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java4
30 files changed, 430 insertions, 464 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 125518358b..5a01888ccf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -499,7 +499,7 @@ public class AMQChannel
}
else
{
- unacked.discard(_storeContext);
+ unacked.dequeueAndDelete(_storeContext);
}
}
@@ -555,7 +555,7 @@ public class AMQChannel
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
- unacked.discard(_storeContext);
+ unacked.dequeueAndDelete(_storeContext);
}
}
else
@@ -712,7 +712,7 @@ public class AMQChannel
{
try
{
- message.discard(_storeContext);
+ message.dequeueAndDelete(_storeContext);
message.setQueueDeleted(true);
}
@@ -831,9 +831,7 @@ public class AMQChannel
{
AMQMessage message = bouncedMessage.getAMQMessage();
_session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
-
- message.decrementReference(_storeContext);
+ new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
index 1723d46ef0..8d41cc58d2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
@@ -82,13 +82,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
}
else
{
- queueEntry.discard(_storeContext);
+ queueEntry.dequeueAndDelete(_storeContext);
_log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry);
}
}
else
{
- queueEntry.discard(_storeContext);
+ queueEntry.dequeueAndDelete(_storeContext);
_log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 415f1fe8be..a81b2cc2db 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -56,14 +56,10 @@ public abstract class RequiredDeliveryException extends AMQException
public void setMessage(final AMQMessage payload)
{
-
- // Increment the reference as this message is in the routing phase
- // and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
- // handler. So increment here.
- payload.incrementReference(1);
+ // handler.
+ // Messages are all kept in memory now. Only queues can push messages out of memory.
_amqMessage = payload;
-
}
public AMQMessage getAMQMessage()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index 0f40e00624..918fcd8407 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.ack;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -116,22 +115,20 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (QueueEntry msg : _unacked.values())
{
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(storeContext);
-
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ msg.dequeueAndDelete(storeContext);
}
}
public void undoPrepare()
{
- //decrementReference is annoyingly untransactional (due to
- //in memory counter) so if we failed in prepare for full
- //txn, this op will have to compensate by fixing the count
- //in memory (persistent changes will be rolled back by store)
- for (QueueEntry msg : _unacked.values())
- {
- msg.getMessage().incrementReference(1);
- }
+ //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the
+ // transactionLog. Only when the transaction succesfully completes will it perform any
+ // update of the internal transactionLog reference counting and any resulting message data deletion.
+ // The success or failure of the data deletion is not important to this transaction only that the ack has been
+ // successfully recorded.
}
public void commit(StoreContext storeContext)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index efdadd4922..ac3b0b5e49 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -174,8 +174,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- unacked.getValue().discard(storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ unacked.getValue().dequeueAndDelete(storeContext);
it.remove();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index f3cab10ed7..bd70cd7776 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -65,38 +65,38 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
long deliveryTag = body.getDeliveryTag();
- QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+ QueueEntry queueEntry = channel.getUnacknowledgedMessageMap().get(deliveryTag);
- if (message == null)
+ if (queueEntry == null)
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
}
else
{
- if (message.isQueueDeleted())
+ if (queueEntry.isQueueDeleted())
{
_logger.warn("Message's Queue as already been purged, unable to Reject. " +
"Dropping message should use Dead Letter Queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- if(message != null)
+ queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+ if(queueEntry != null)
{
- message.discard(channel.getStoreContext());
+ queueEntry.dequeueAndDelete(channel.getStoreContext());
}
//sendtoDeadLetterQueue(msg)
return;
}
- if (!message.getMessage().isReferenced())
+ if (queueEntry.isDeleted())
{
- _logger.warn("Message as already been purged, unable to Reject.");
+ _logger.warn("QueueEntry as already been deleted, unable to Reject.");
return;
}
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
@@ -105,7 +105,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
//if (!evt.getMethod().resend)
{
- message.reject();
+ queueEntry.reject();
}
if (body.getRequeue())
@@ -115,7 +115,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
else
{
_logger.warn("Dropping message as requeue not required and there is no dead letter queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+ queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
//sendtoDeadLetterQueue(AMQMessage message)
// message.queue = channel.getDefaultDeadLetterQueue();
// channel.requeue(deliveryTag);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 1f56b2ccd2..e96d2ba874 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -114,17 +114,8 @@ public interface AMQMessage
throws AMQException;
- void removeMessage(StoreContext storeContext) throws AMQException;
String toString();
String debugIdentity();
-
- // Reference counting methods
-
- void decrementReference(StoreContext storeContext) throws MessageCleanupException;
-
- boolean incrementReference(int queueCount);
-
- boolean isReferenced();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 0838b71c54..9fadbb0cdc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -81,25 +81,18 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
boolean isDeleted();
-
int delete() throws AMQException;
-
QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
-
-
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
-
void addQueueDeleteTask(final Task task);
-
List<QueueEntry> getMessagesOnTheQueue();
List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 5d4322c4fc..5eafd281c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -156,8 +156,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
_logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues);
}
- try
- {
+
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
_message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
@@ -196,7 +195,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
{
int offset;
final int queueCount = _destinationQueues.size();
- _message.incrementReference(queueCount);
if(queueCount == 1)
{
offset = 0;
@@ -222,12 +220,8 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
return _message;
- }
- finally
- {
- // Remove refence for routing process . Reference count should now == delivered queue count
- if(_message != null) _message.decrementReference(_txnContext.getStoreContext());
- }
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
index ec48a2afb0..92c10b0347 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -58,13 +58,6 @@ public class PersistentAMQMessage extends TransientAMQMessage
}
@Override
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- _log.info("PAMQM : removing message:" + _messageId);
- _transactionLog.removeMessage(storeContext, _messageId);
- }
-
- @Override
public boolean isPersistent()
{
return true;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 0df976a620..09600b9d28 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -49,7 +49,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
public abstract State getState();
}
-
public final class AvailableState extends EntryState
{
@@ -59,7 +58,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class DequeuedState extends EntryState
{
@@ -69,7 +67,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class DeletedState extends EntryState
{
@@ -88,7 +85,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class NonSubscriptionAcquiredState extends EntryState
{
public State getState()
@@ -106,7 +102,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
_subscription = subscription;
}
-
public State getState()
{
return State.ACQUIRED;
@@ -118,16 +113,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
final static EntryState AVAILABLE_STATE = new AvailableState();
final static EntryState DELETED_STATE = new DeletedState();
final static EntryState DEQUEUED_STATE = new DequeuedState();
final static EntryState EXPIRED_STATE = new ExpiredState();
final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
AMQQueue getQueue();
AMQMessage getMessage();
@@ -141,9 +132,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean isAcquired();
boolean acquire();
+
boolean acquire(Subscription sub);
boolean delete();
+
boolean isDeleted();
boolean acquiredBySubscription();
@@ -170,12 +163,21 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
void dequeue(final StoreContext storeContext) throws FailedDequeueException;
- void dispose(final StoreContext storeContext) throws MessageCleanupException;
-
- void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
+ /**
+ * Message has been ack so dequeueAndDelete it.
+ * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ * from the transaciton log
+ *
+ * @param storeContext the transactional Context in which to perform the deletion
+ *
+ * @throws FailedDequeueException
+ * @throws MessageCleanupException
+ */
+ void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException;
boolean isQueueDeleted();
void addStateChangeListener(StateChangeListener listener);
+
boolean removeStateChangeListener(StateChangeListener listener);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 3eb1636884..911ed8321b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -282,13 +282,12 @@ public class QueueEntryImpl implements QueueEntry
}
getQueue().dequeue(storeContext, this);
- if(_stateChangeListeners != null)
+
+ if (_stateChangeListeners != null)
{
- notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+ notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED);
}
-
}
-
}
private void notifyStateChange(final State oldState, final State newState)
@@ -299,29 +298,15 @@ public class QueueEntryImpl implements QueueEntry
}
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
- {
- _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state);
- if(delete())
- {
- _log.info("QEI delete message:" + getMessage().getMessageId());
- getMessage().decrementReference(storeContext);
- }
- else
- {
- _log.info("QEI delete state wrong:" + getMessage().getMessageId());
- }
- }
-
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
- //if the queue is null then the message is waiting to be acked, but has been removed.
+ //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d;
if (getQueue() != null)
{
dequeue(storeContext);
}
- dispose(storeContext);
+ delete();
}
public boolean isQueueDeleted()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a0f21033c7..501e90b4d7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -408,8 +408,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (entry.immediateAndNotDelivered())
{
- dequeue(storeContext, entry);
- entry.dispose(storeContext);
+ //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content
+ // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks
+ // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses.
+ entry.acquire();
+ entry.dequeueAndDelete(storeContext);
}
else if (!(entry.isAcquired() || entry.isDeleted()))
{
@@ -562,6 +565,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ /**
+ * Only call from queue Entry
+ * @param storeContext
+ * @param entry
+ * @throws FailedDequeueException
+ */
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
decrementQueueCount();
@@ -578,7 +587,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
}
- //entry.dispose(storeContext);
}
catch (MessageCleanupException e)
@@ -814,11 +822,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+
public void moveMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
String queueName,
StoreContext storeContext)
{
+ // The move is a two step process. First the messages are moved in the _transactionLog.
+ // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the
+ // existing queue.
+ // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery
+ // this is done as the message is recieved.
+ // So The final step is to enqueue the messages on the new queue.
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
TransactionLog transactionLog = getVirtualHost().getTransactionLog();
@@ -844,7 +859,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
transactionLog.beginTran(storeContext);
- // Move the messages in on the transaction log.
+ // Move the messages in the transaction log.
for (QueueEntry entry : entries)
{
AMQMessage message = entry.getMessage();
@@ -853,7 +868,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
- // dequeue does not decrement the refence count
+ // dequeue will remove the messages from the queue
entry.dequeue(storeContext);
}
@@ -882,10 +897,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
+ // Add messages to new queue
for (QueueEntry entry : entries)
{
toQueue.enqueue(storeContext, entry.getMessage());
-
}
}
catch (MessageCleanupException e)
@@ -918,7 +933,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!entry.isDeleted())
{
- return entry.getMessage().incrementReference(1);
+ return true;
}
}
@@ -940,7 +955,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
AMQMessage message = entry.getMessage();
- if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+ if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable())
{
transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
@@ -973,7 +988,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
for (QueueEntry entry : entries)
{
- if (entry.getMessage().isReferenced())
+ if (!entry.isDeleted())
{
toQueue.enqueue(storeContext, entry.getMessage());
}
@@ -1008,7 +1023,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
&& !node.isDeleted()
&& node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
}
}
@@ -1032,7 +1047,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
noDeletes = false;
}
@@ -1050,7 +1065,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
count++;
}
@@ -1315,8 +1330,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (node.acquire())
{
+ // creating a new final store context per message seems wasteful.
final StoreContext reapingStoreContext = new StoreContext();
- node.discard(reapingStoreContext);
+ node.dequeueAndDelete(reapingStoreContext);
}
}
QueueEntry newNode = _entries.next(node);
@@ -1431,7 +1447,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.expired() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
index f3d74fb01c..fa4e85a043 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -172,11 +172,6 @@ public class TransientAMQMessage implements AMQMessage
_expiration = expiration;
}
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
-
public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
return new BodyFrameIterator(protocolSession, channel);
@@ -197,76 +192,6 @@ public class TransientAMQMessage implements AMQMessage
return _messageId;
}
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
- if (_referenceCount.addAndGet(count) <= 1)
- {
- int newcount = _referenceCount.addAndGet(-count);
- _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount);
- return false;
- }
- else
- {
- _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1("
- + _referenceCount.get() + ")");
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- * @param storeContext
- *
- * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
- {
-
- int count = _referenceCount.decrementAndGet();
-
- _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count);
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE / 2);
-
- try
- {
- _log.debug("Reference Count hit 0, removing message");
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
- removeMessage(storeContext);
- }
- catch (AMQException e)
- {
- // to maintain consistency, we revert the count
- incrementReference(1);
- throw new MessageCleanupException(getMessageId(), e);
- }
- }
- else
- {
- if (count < 0)
- {
- throw new MessageCleanupException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
/**
* Called selectors to determin if the message has already been sent
*
@@ -435,11 +360,6 @@ public class TransientAMQMessage implements AMQMessage
return _arrivalTime;
}
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- //no-op
- }
-
public String toString()
{
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index fe81346c8c..33b3d8608e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -1357,7 +1357,8 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
if(message != null)
{
- message.incrementReference(1);
+ //todo must enqueue message to build reference table
+// message.incrementReference(1);
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index aa7b6e2542..cd0f0c1769 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -30,24 +30,28 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-/** A simple message store that stores the messages in a threadsafe structure in memory.
+/**
+ * A simple message store that stores the messages in a threadsafe structure in memory.
*
* NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog
*
* This class really should have no storage unless we want to do inMemory Recovery.
- *
*/
public class MemoryMessageStore implements TransactionLog, RoutingTable
{
@@ -63,6 +67,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
+ protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
public void configure()
{
@@ -112,6 +117,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
_metaDataMap.remove(messageId);
_contentBodyMap.remove(messageId);
+ _messageEnqueueMap.remove(messageId);
}
public void createExchange(Exchange exchange) throws AMQException
@@ -134,7 +140,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
-
public void createQueue(AMQQueue queue) throws AMQException
{
// Not requred to do anything
@@ -152,12 +157,39 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
- // Not required to do anything
+ synchronized (_messageEnqueueMap)
+ {
+ List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
+ if (queues == null)
+ {
+ queues = new LinkedList<AMQQueue>();
+ _messageEnqueueMap.put(messageId, queues);
+ }
+
+ queues.add(queue);
+ }
}
public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
- // Not required to do anything
+ synchronized (_messageEnqueueMap)
+ {
+ List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
+ if (queues == null || !queues.contains(queue))
+ {
+ throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
+ + " but it is not enqueued on that queue.");
+ }
+ else
+ {
+ queues.remove(queue);
+ if (queues.isEmpty())
+ {
+ removeMessage(context,messageId);
+ }
+ }
+ }
+
}
public void beginTran(StoreContext context) throws AMQException
@@ -238,7 +270,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
private void checkNotClosed() throws MessageStoreClosedException
- {
+ {
if (_closed.get())
{
throw new MessageStoreClosedException();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 1c58e644e9..119a4b1692 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -144,7 +144,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
StoreContext storeContext = getChannel().getStoreContext();
try
- { // if we do not need to wait for client acknowledgements
+ {
+ // if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
// By doing this _before_ the send we ensure that it
@@ -153,7 +154,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
- entry.dequeue(storeContext);
+ entry.dequeueAndDelete(storeContext);
synchronized (getChannel())
@@ -163,7 +164,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
sendToClient(entry, deliveryTag);
}
- entry.dispose(storeContext);
}
finally
{
@@ -316,7 +316,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_autoClose = false;
}
-
+ _logger.info(debugIdentity()+" Created subscription:");
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 8e63b95f0d..abfb60c5bf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -92,20 +92,12 @@ public class LocalTransactionalContext implements TransactionalContext
public void process() throws AMQException
{
- _message.incrementReference(1);
- try
- {
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
if(entry.immediateAndNotDelivered())
{
getReturnMessages().add(new NoConsumersException(_message));
}
- }
- finally
- {
- _message.decrementReference(getStoreContext());
- }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 145d7f8b13..561f998b98 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -123,18 +123,18 @@ public class NonTransactionalContext implements TransactionalContext
unacknowledgedMessageMap.size());
unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException
{
if (debug)
{
- _log.debug("Discarding message: " + message.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
}
- if(message.getMessage().isPersistent())
+ if(queueEntry.getMessage().isPersistent())
{
beginTranIfNecessary();
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- message.discard(_storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ queueEntry.dequeueAndDelete(_storeContext);
return false;
}
@@ -157,10 +157,10 @@ public class NonTransactionalContext implements TransactionalContext
}
else
{
- QueueEntry msg;
- msg = unacknowledgedMessageMap.get(deliveryTag);
+ QueueEntry queueEntry;
+ queueEntry = unacknowledgedMessageMap.get(deliveryTag);
- if (msg == null)
+ if (queueEntry == null)
{
_log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
@@ -170,15 +170,17 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
}
- if(msg.getMessage().isPersistent())
+ if(queueEntry.getMessage().isPersistent())
{
beginTranIfNecessary();
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ queueEntry.dequeueAndDelete(_storeContext);
unacknowledgedMessageMap.remove(deliveryTag);
@@ -186,7 +188,7 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.getMessage().getMessageId());
+ queueEntry.getMessage().getMessageId());
}
}
if(_inTran)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 1d729a82a5..52b8b0ad19 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.queue.AMQMessage;
@@ -37,9 +38,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.TransientAMQMessage;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -81,20 +83,6 @@ public class TxAckTest extends TestCase
combined.stop();
}
- public void testPrepare() throws AMQException
- {
- individual.prepare();
- multiple.prepare();
- combined.prepare();
- }
-
- public void testUndoPrepare() throws AMQException
- {
- individual.undoPrepare();
- multiple.undoPrepare();
- combined.undoPrepare();
- }
-
public void testCommit() throws AMQException
{
individual.commit();
@@ -115,12 +103,13 @@ public class TxAckTest extends TestCase
private final List<Long> _unacked;
private StoreContext _storeContext = new StoreContext();
private AMQQueue _queue;
+ private TransactionLog _transactionLog = new TestableMemoryMessageStore();
private static final int MESSAGE_SIZE=100;
Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
- TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
+ TransactionalContext txnContext = new NonTransactionalContext(_transactionLog,
_storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
@@ -138,12 +127,15 @@ public class TxAckTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- AMQMessage message = new TestMessage(deliveryTag, info);
+ AMQMessage message = new TestMessage(deliveryTag, info, (TestTransactionLog) _transactionLog);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
message.setPublishAndContentHeaderBody(_storeContext, info, header);
+
+
+
_map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
}
_acked = acked;
@@ -165,25 +157,6 @@ public class TxAckTest extends TestCase
}
}
- void prepare() throws AMQException
- {
- _op.consolidate();
- _op.prepare(_storeContext);
-
- assertCount(_acked, -1);
- assertCount(_unacked, 0);
-
- }
-
- void undoPrepare()
- {
- _op.consolidate();
- _op.undoPrepare();
-
- assertCount(_acked, 1);
- assertCount(_unacked, 0);
- }
-
void commit()
{
_op.consolidate();
@@ -232,30 +205,22 @@ public class TxAckTest extends TestCase
private class TestMessage extends TransientAMQMessage
{
private final long _tag;
- private int _count;
+ private TestTransactionLog _transactionLog;
- TestMessage(long tag, MessagePublishInfo publishBody)
+ public TestMessage(long tag, MessagePublishInfo publishBody, TestTransactionLog transactionLog)
throws AMQException
{
super(createMessage( publishBody));
_tag = tag;
+ _transactionLog = transactionLog;
}
- public boolean incrementReference(int count)
- {
- _count+=count;
- return true;
- }
-
- public void decrementReference(StoreContext context)
- {
- _count--;
- }
-
void assertCountEquals(int expected)
{
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ List<AMQQueue> list = _transactionLog.getMessageReferenceMap(_messageId);
+ int actual = (list == null ? 0 : list.size());
+ assertEquals("Wrong count for message with tag " + _tag, expected, actual);
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 40b08a2e39..78cf610f28 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -324,7 +324,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 98465eda20..9f8d5f9a99 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -30,14 +30,16 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
@@ -57,7 +59,7 @@ public class AckTest extends TestCase
private MockProtocolSession _protocolSession;
- private TestMemoryMessageStore _messageStore;
+ private TestableMemoryMessageStore _messageStore;
private StoreContext _storeContext = new StoreContext();
@@ -72,14 +74,15 @@ public class AckTest extends TestCase
super.setUp();
ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
- _messageStore = new TestMemoryMessageStore();
+ VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog());
_protocolSession = new MockProtocolSession(_messageStore);
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
- _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
- null);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"),
+ true, vhost, null);
}
protected void tearDown()
@@ -185,7 +188,7 @@ public class AckTest extends TestCase
/**
* Tests that in no-ack mode no messages are retained
*/
- public void testPersistentNoAckMode() throws AMQException
+ public void testPersistentNoAckMode() throws AMQException, InterruptedException
{
// false arg means no acks expected
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
@@ -194,7 +197,7 @@ public class AckTest extends TestCase
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
+ assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0);
assertTrue(_messageStore.getContentBodyMap().size() == 0);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
deleted file mode 100644
index 44e9851db7..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-
-public class MessageReferenceCountingTest extends TestCase
-{
- AMQMessage _message;
-
- public void setUp()
- {
- _message = MessageFactory.getInstance().createMessage(null, false);
- }
-
- public void testInitialState()
- {
-
- assertTrue("New messages should have a reference", _message.isReferenced());
- }
-
- public void testIncrementReference()
- {
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed ",_message.incrementReference(1));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- }
-
- public void testDecrementReference()
- {
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- try
- {
- _message.decrementReference(null);
- }
- catch (MessageCleanupException e)
- {
- fail("Decrement should be allowed:"+e.getMessage());
- }
-
- assertFalse("Message should not be Referenced state", _message.isReferenced());
-
- try
- {
- _message.decrementReference(null);
- fail("Decrement should not be allowed as we should have a ref count of 0");
- }
- catch (MessageCleanupException e)
- {
- assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0"));
- }
-
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 12ff91cdad..92235648ec 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -38,4 +38,9 @@ public class MockQueueEntry extends QueueEntryImpl
{
super(_defaultList, message);
}
+
+ public MockQueueEntry(AMQMessage message, SimpleAMQQueue queue)
+ {
+ super(new SimpleQueueEntryList(queue) ,message);
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index fdaf2c309f..4551ae5af8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -20,18 +20,46 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
public class PersistentMessageTest extends TransientMessageTest
{
- private MemoryMessageStore _messageStore;
+ private TestableMemoryMessageStore _messageStore;
+
+ protected SimpleAMQQueue _queue;
+ protected AMQShortString _q1name = new AMQShortString("q1name");
+ protected AMQShortString _owner = new AMQShortString("owner");
+ protected AMQShortString _routingKey = new AMQShortString("routing key");
+ private TransactionalContext _messageDeliveryContext;
+ private static final long MESSAGE_SIZE = 0L;
+ private List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- public void setUp()
+ public void setUp() throws Exception
{
- _messageStore = new MemoryMessageStore();
- _messageStore.configure();
+ _messageStore = new TestableMemoryMessageStore();
+
_storeContext = new StoreContext();
+ VirtualHost vhost = new VirtualHost(PersistentMessageTest.class.getName(), _messageStore);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null);
+ // Create IncomingMessage and nondurable queue
+ _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages);
+
}
@Override
@@ -47,4 +75,86 @@ public class PersistentMessageTest extends TransientMessageTest
assertTrue(_message.isPersistent());
}
+ /**
+ * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and
+ * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right
+ * contents. TransactionLog = Empty, returnMessages 1 item.
+ *
+ * @throws Exception
+ */
+ public void testImmediateReturnNotInLog() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl(null, true, false, null);
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_messageStore);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it was not enqueued
+ List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
+ assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList);
+ checkMessageMetaDataRemoved(messageId);
+
+ assertEquals("Return message count not correct", 1, _returnMessages.size());
+ }
+
+ protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
+ new MockProtocolSession(_messageStore), _messageStore);
+
+ // equivalent to amqChannel.publishContenHeader
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ // This message has no bodies
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+
+ msg.setContentHeaderBody(contentHeaderBody);
+ msg.setExpiration();
+
+ return msg;
+ }
+
+ protected void checkMessageMetaDataExists(long messageId)
+ {
+ try
+ {
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+ }
+ catch (AMQException amqe)
+ {
+ fail("Message MetaData does not exist for message:" + messageId);
+ }
+ }
+
+ protected void checkMessageMetaDataRemoved(long messageId)
+ {
+ try
+ {
+ assertNull("Message MetaData still exists for message:" + messageId,
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
+ assertNull("Message still has values in the reference map:" + messageId,
+ _messageStore.getMessageReferenceMap(messageId));
+
+ }
+ catch (AMQException e)
+ {
+ fail("AMQE thrown whilst trying to getMessageMetaData:" + e.getMessage());
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 665ca089da..7a97837208 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -36,10 +36,12 @@ import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestTransactionLog;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.ArrayList;
import java.util.List;
@@ -319,7 +321,7 @@ public class SimpleAMQQueueTest extends TestCase
{
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -338,21 +340,22 @@ public class SimpleAMQQueueTest extends TestCase
_store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
- AMQQueue data = _store.getMessages().get(messageId);
+ List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
assertNotNull(data);
// Dequeue message
-
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
- MockQueueEntry entry = new MockQueueEntry(message);
- _queue.dequeue(null, entry);
+ MockQueueEntry entry = new MockQueueEntry(message, _queue);
+ entry.getQueueEntryList().add(message);
+ entry.acquire();
+ entry.dequeue(null);
// Check that it is dequeued
- data = _store.getMessages().get(messageId);
+ data = _store.getMessageReferenceMap(messageId);
assertNull(data);
}
@@ -381,7 +384,7 @@ public class SimpleAMQQueueTest extends TestCase
public AMQMessage createMessage() throws AMQException
{
- AMQMessage message = new TestMessage(info);
+ AMQMessage message = new TestMessage(info, _store);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -397,29 +400,21 @@ public class SimpleAMQQueueTest extends TestCase
public class TestMessage extends TransientAMQMessage
{
private final long _tag;
- private int _count;
+ private TestTransactionLog _transactionLog;
- TestMessage(MessagePublishInfo publishBody)
+ TestMessage(MessagePublishInfo publishBody, TestTransactionLog transactionLog)
throws AMQException
{
super(SimpleAMQQueueTest.createMessage(publishBody));
_tag = getMessageId();
+ _transactionLog = transactionLog;
}
- public boolean incrementReference(int count)
- {
- _count+=count;
- return true;
- }
-
- public void decrementReference(StoreContext context)
- {
- _count--;
- }
void assertCountEquals(int expected)
{
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ assertEquals("Wrong count for message with tag " + _tag, expected,
+ _transactionLog.getMessageReferenceMap(_messageId).size());
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index e8acfc2fda..5a4c435e59 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -34,7 +34,7 @@ import org.apache.qpid.server.queue.AMQMessage;
*/
public class TestReferenceCounting extends TestCase
{
- private TestMemoryMessageStore _store;
+ private TestableMemoryMessageStore _store;
private StoreContext _storeContext = new StoreContext();
@@ -42,7 +42,7 @@ public class TestReferenceCounting extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _store = new TestMemoryMessageStore();
+ _store = new TestableMemoryMessageStore();
}
/**
@@ -54,19 +54,9 @@ public class TestReferenceCounting extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- final long messageId = _store.getNewMessageId();
-
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message.incrementReference(1);
-
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
-
- assertEquals(1, _store.getMessageMetaDataMap().size());
- message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}
@@ -84,16 +74,10 @@ public class TestReferenceCounting extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message.incrementReference(1);
-
- assertEquals(1, _store.getMessageMetaDataMap().size());
- message.incrementReference(1);
- message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
index 4e48435962..bb051693c3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
@@ -20,32 +20,12 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.AMQQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
import java.util.List;
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestMemoryMessageStore extends MemoryMessageStore
+public interface TestTransactionLog
{
- public TestMemoryMessageStore()
- {
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
- }
-
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
- {
- return _metaDataMap;
- }
-
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
- {
- return _contentBodyMap;
- }
+ public List<AMQQueue> getMessageReferenceMap(Long messageID);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 9146fe88ae..882f88b8f3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -23,22 +23,28 @@ package org.apache.qpid.server.store;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Adds some extra methods to the memory message store for testing purposes.
*/
-public class TestableMemoryMessageStore extends MemoryMessageStore
+public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
{
MemoryMessageStore _mms = null;
- private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
@@ -47,46 +53,127 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
public TestableMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+ _mms = new MemoryMessageStore();
+ _mms.configure();
}
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
{
- if (_mms != null)
- {
- return _mms._metaDataMap;
- }
- else
- {
- return _metaDataMap;
- }
+ return _mms._metaDataMap;
}
public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
- if (_mms != null)
- {
- return _mms._contentBodyMap;
- }
- else
- {
- return _contentBodyMap;
- }
+ return _mms._contentBodyMap;
}
-
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+
+ public List<AMQQueue> getMessageReferenceMap(Long messageId)
+ {
+ return _mms._messageEnqueueMap.get(messageId);
+ }
+
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+ {
+ _mms.configure(virtualHost,base,config);
+ }
+
+ public void close() throws Exception
+ {
+ _mms.close();
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ _mms.createExchange(exchange);
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ _mms.removeExchange(exchange);
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ _mms.bindQueue(exchange,routingKey,queue,args);
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ _mms.unbindQueue(exchange,routingKey,queue,args);
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ _mms.createQueue(queue);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ {
+ _mms.createQueue(queue,arguments);
+ }
+
+ public void removeQueue(AMQQueue queue) throws AMQException
+ {
+ _mms.removeQueue(queue);
+ }
+
+ public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ {
+ _mms.removeMessage(storeContext, messageId);
+ }
+
+ public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ _mms.enqueueMessage(context,queue,messageId);
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ _mms.dequeueMessage(context,queue,messageId);
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ _mms.beginTran(context);
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ _mms.commitTran(context);
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ _mms.abortTran(context);
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return _mms.inTran(context);
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody);
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ _mms.storeMessageMetaData(context,messageId,messageMetaData);
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
- getMessages().put(messageId, queue);
+ return _mms.getMessageMetaData(context,messageId);
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
- getMessages().remove(messageId);
+ return _mms.getContentBodyChunk(context,messageId,index);
}
- public HashMap<Long, AMQQueue> getMessages()
+ public boolean isPersistent()
{
- return _messages;
+ return _mms.isPersistent();
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
index ca6644d141..26802b4210 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.txn;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.LinkedList;
@@ -194,7 +194,7 @@ public class TxnBufferTest extends TestCase
}
}
- class MockStore extends TestMemoryMessageStore
+ class MockStore extends TestableMemoryMessageStore
{
final Object BEGIN = "BEGIN";
final Object ABORT = "ABORT";