summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-12-18 23:00:40 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-12-18 23:00:40 +0000
commit8a916973a6a6209d49af5363b10d3b29af2b151f (patch)
tree6f909e235183684d703e3b4d9ff22afe53297474
parentb9857867db0ea728abb837027fc164f8d01b1191 (diff)
downloadqpid-python-8a916973a6a6209d49af5363b10d3b29af2b151f.tar.gz
QPID-711 : create a QueueEntry class and move message-on-queue functions (such as taken()) to this class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@605352 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java67
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java46
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java309
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java217
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java173
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java26
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java10
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java24
32 files changed, 545 insertions, 623 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 0907ea9df0..9fb3a5040b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -36,10 +36,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.Subscription;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -421,33 +418,32 @@ public class AMQChannel
/**
* Add a message to the channel-based list of unacknowledged messages
*
- * @param message the message that was delivered
+ * @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
* @param consumerTag The tag for the consumer that is to acknowledge this message.
- * @param queue the queue from which the message was delivered
*/
- public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
+ public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, AMQShortString consumerTag)
{
if (_log.isDebugEnabled())
{
- if (queue == null)
+ if (entry.getQueue() == null)
{
- _log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
+ _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity());
}
else
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
- + ") with a queue(" + queue + ") for " + consumerTag);
+ _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
+ + ") with a queue(" + entry.getQueue() + ") for " + consumerTag);
}
}
}
synchronized (_unacknowledgedMessageMap.getLock())
{
- _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+ _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag));
checkSuspension();
}
}
@@ -499,16 +495,16 @@ public class AMQChannel
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
- if (unacked.queue != null)
+ if (!unacked.isQueueDeleted())
{
// Ensure message is released for redelivery
- unacked.message.release(unacked.queue);
+ unacked.entry.release();
// Mark message redelivered
- unacked.message.setRedelivered(true);
+ unacked.getMessage().setRedelivered(true);
// Deliver Message
- deliveryContext.deliver(unacked.message, unacked.queue, false);
+ deliveryContext.deliver(unacked.entry, false);
// Should we allow access To the DM to directy deliver the message?
// As we don't need to check for Consumers or worry about incrementing the message count?
@@ -533,13 +529,13 @@ public class AMQChannel
{
// Ensure message is released for redelivery
- if (unacked.queue != null)
+ if (!unacked.isQueueDeleted())
{
- unacked.message.release(unacked.queue);
+ unacked.entry.release();
}
// Mark message redelivered
- unacked.message.setRedelivered(true);
+ unacked.getMessage().setRedelivered(true);
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
@@ -559,16 +555,16 @@ public class AMQChannel
deliveryContext = _txnContext;
}
- if (unacked.queue != null)
+ if (!unacked.isQueueDeleted())
{
// Redeliver the messages to the front of the queue
- deliveryContext.deliver(unacked.message, unacked.queue, true);
+ deliveryContext.deliver(unacked.entry, true);
// Deliver increments the message count but we have already deliverted this once so don't increment it again
// this was because deliver did an increment changed this.
}
else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
// _log.error("Requested requeue of message:" + deliveryTag +
// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
@@ -591,7 +587,7 @@ public class AMQChannel
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
_log.debug(
- (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
+ (count++) + ": (" + message.getMessage().debugIdentity() + ")" + "[" + message.deliveryTag + "]");
return false; // Continue
}
@@ -630,7 +626,7 @@ public class AMQChannel
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
AMQShortString consumerTag = message.consumerTag;
- AMQMessage msg = message.message;
+ AMQMessage msg = message.getMessage();
msg.setRedelivered(true);
if (consumerTag != null)
{
@@ -649,7 +645,7 @@ public class AMQChannel
// Message has no consumer tag, so was "delivered" to a GET
// or consumer no longer registered
// cannot resend, so re-queue.
- if (message.queue != null)
+ if (!message.isQueueDeleted())
{
if (requeue)
{
@@ -690,7 +686,7 @@ public class AMQChannel
for (UnacknowledgedMessage message : msgToResend)
{
- AMQMessage msg = message.message;
+ AMQMessage msg = message.getMessage();
// Our Java Client will always suspend the channel when resending!
// If the client has requested the messages be resent then it is
@@ -705,13 +701,13 @@ public class AMQChannel
// else
// {
// release to allow it to be delivered
- msg.release(message.queue);
+ message.entry.release();
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
- Subscription sub = msg.getDeliveredSubscription(message.queue);
+ Subscription sub = message.entry.getDeliveredSubscription();
if (sub != null)
{
@@ -741,7 +737,7 @@ public class AMQChannel
+ System.identityHashCode(sub));
}
- sub.addToResendQueue(msg);
+ sub.addToResendQueue(message.entry);
_unacknowledgedMessageMap.remove(message.deliveryTag);
}
} // sync(sub.getSendLock)
@@ -789,10 +785,10 @@ public class AMQChannel
// Process Messages to Requeue at the front of the queue
for (UnacknowledgedMessage message : msgToRequeue)
{
- message.message.release(message.queue);
- message.message.setRedelivered(true);
+ message.entry.release();
+ message.entry.setRedelivered(true);
- deliveryContext.deliver(message.message, message.queue, true);
+ deliveryContext.deliver(message.entry, true);
_unacknowledgedMessageMap.remove(message.deliveryTag);
}
@@ -813,17 +809,18 @@ public class AMQChannel
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- if (message.queue == queue)
+ if (message.getQueue() == queue)
{
try
{
message.discard(_storeContext);
- message.queue = null;
+ message.setQueueDeleted(true);
+
}
catch (AMQException e)
{
_log.error(
- "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
+ "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index 5ca8d57f7c..ac29998c2a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -87,7 +87,7 @@ public class TxAck implements TxnOp
//buffer must be marked as persistent:
for (UnacknowledgedMessage msg : _unacked)
{
- if (msg.message.isPersistent())
+ if (msg.getMessage().isPersistent())
{
return true;
}
@@ -100,7 +100,7 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (UnacknowledgedMessage msg : _unacked)
{
- msg.restoreTransientMessageData();
+ //msg.restoreTransientMessageData();
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
@@ -116,7 +116,7 @@ public class TxAck implements TxnOp
for (UnacknowledgedMessage msg : _unacked)
{
msg.clearTransientMessageData();
- msg.message.takeReference();
+ msg.getMessage().takeReference();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 7088c704ed..40f5970cac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -24,19 +24,21 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
public class UnacknowledgedMessage
{
- public final AMQMessage message;
+ public final QueueEntry entry;
public final AMQShortString consumerTag;
public final long deliveryTag;
- public AMQQueue queue;
- public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, AMQShortString consumerTag, long deliveryTag)
+ private boolean _queueDeleted;
+
+
+ public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag)
{
- this.queue = queue;
- this.message = message;
+ this.entry = entry;
this.consumerTag = consumerTag;
this.deliveryTag = deliveryTag;
}
@@ -45,9 +47,9 @@ public class UnacknowledgedMessage
{
StringBuilder sb = new StringBuilder();
sb.append("Q:");
- sb.append(queue);
+ sb.append(entry.getQueue());
sb.append(" M:");
- sb.append(message);
+ sb.append(entry.getMessage());
sb.append(" CT:");
sb.append(consumerTag);
sb.append(" DT:");
@@ -58,22 +60,42 @@ public class UnacknowledgedMessage
public void discard(StoreContext storeContext) throws AMQException
{
- if (queue != null)
+ if (entry.getQueue() != null)
{
- queue.dequeue(storeContext, message);
+ entry.getQueue().dequeue(storeContext, entry);
}
//if the queue is null then the message is waiting to be acked, but has been removed.
- message.decrementReference(storeContext);
+ entry.getMessage().decrementReference(storeContext);
}
public void restoreTransientMessageData() throws AMQException
{
- message.restoreTransientMessageData();
+ entry.getMessage().restoreTransientMessageData();
}
public void clearTransientMessageData()
{
- message.clearTransientMessageData();
+ entry.getMessage().clearTransientMessageData();
+ }
+
+ public AMQMessage getMessage()
+ {
+ return entry.getMessage();
+ }
+
+ public AMQQueue getQueue()
+ {
+ return entry.getQueue();
+ }
+
+ public void setQueueDeleted(boolean queueDeleted)
+ {
+ _queueDeleted = queueDeleted;
+ }
+
+ public boolean isQueueDeleted()
+ {
+ return _queueDeleted;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 30bbdea2ef..20ee646a40 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -97,7 +97,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
UnacknowledgedMessage message = _map.remove(deliveryTag);
if(message != null)
{
- _unackedSize -= message.message.getSize();
+ _unackedSize -= message.getMessage().getSize();
}
return message;
@@ -127,7 +127,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
_map.put(deliveryTag, message);
- _unackedSize += message.message.getSize();
+ _unackedSize += message.getMessage().getSize();
_lastDeliveryTag = deliveryTag;
}
}
@@ -186,7 +186,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
it.remove();
- _unackedSize -= unacked.getValue().message.getSize();
+ _unackedSize -= unacked.getValue().getMessage().getSize();
destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index a7bc49daab..57ae2bb6d4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -143,7 +143,7 @@ public class FanoutExchange extends AbstractExchange
public AMQShortString getDefaultExchangeName()
{
- return ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
}
};
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 4f3aee0c93..c48dd902eb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -85,7 +85,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
}
else
{
- if (message.queue == null || message.queue.isDeleted())
+ if (message.isQueueDeleted() || message.getQueue().isDeleted())
{
_logger.warn("Message's Queue as already been purged, unable to Reject. " +
"Dropping message should use Dead Letter Queue");
@@ -93,7 +93,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
return;
}
- if (!message.message.isReferenced())
+ if (!message.getMessage().isReferenced())
{
_logger.warn("Message as already been purged, unable to Reject.");
return;
@@ -102,7 +102,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
if (_logger.isTraceEnabled())
{
- _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() +
+ _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
@@ -111,7 +111,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
//if (!evt.getMethod().resend)
{
- message.message.reject(message.message.getDeliveredSubscription(message.queue));
+ message.entry.reject();
}
if (body.getRequeue())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 307a4c8d21..d9a9d2273b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -77,22 +77,13 @@ public class AMQMessage
/** Flag to indicate that this message requires 'immediate' delivery. */
private boolean _immediate;
- // private Subscription _takenBySubcription;
- // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
- //todo: this should be part of a messageOnQueue object
- private Set<Subscription> _rejectedBy = null;
+ private long _expiration;
- //todo: this should be part of a messageOnQueue object
- private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
- //todo: this should be part of a messageOnQueue object
- private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
private final int hashcode = System.identityHashCode(this);
- //todo: this should be part of a messageOnQueue object
- private long _expiration;
public String debugIdentity()
{
@@ -282,7 +273,7 @@ public class AMQMessage
setContentHeaderBody(contentHeader);
}
- /**
+ /* *
* Used in testing only. This allows the passing of the content header and some body fragments on construction.
*
* @param messageId
@@ -293,7 +284,7 @@ public class AMQMessage
* @param contentBodies
*
* @throws AMQException
- */
+ */ /*
public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
@@ -306,7 +297,7 @@ public class AMQMessage
addContentBodyFrame(storeContext, cb);
}
}
-
+ */
protected AMQMessage(AMQMessage msg) throws AMQException
{
_messageId = msg._messageId;
@@ -485,84 +476,6 @@ public class AMQMessage
return _deliveredToConsumer;
}
- public boolean isTaken(AMQQueue queue)
- {
- // return _taken.get();
-
- synchronized (this)
- {
- AtomicBoolean taken = _takenMap.get(queue);
- if (taken == null)
- {
- taken = new AtomicBoolean(false);
- _takenMap.put(queue, taken);
- }
-
- return taken.get();
- }
- }
-
- public boolean taken(AMQQueue queue, Subscription sub)
- {
- // if (_taken.getAndSet(true))
- // {
- // return true;
- // }
- // else
- // {
- // _takenBySubcription = sub;
- // return false;
- // }
-
- synchronized (this)
- {
- AtomicBoolean taken = _takenMap.get(queue);
- if (taken == null)
- {
- taken = new AtomicBoolean(false);
- }
-
- if (taken.getAndSet(true))
- {
- return true;
- }
- else
- {
- _takenMap.put(queue, taken);
- _takenBySubcriptionMap.put(queue, sub);
-
- return false;
- }
- }
- }
-
- public void release(AMQQueue queue)
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("Releasing Message:" + debugIdentity());
- }
-
- // _taken.set(false);
- // _takenBySubcription = null;
-
- synchronized (this)
- {
- AtomicBoolean taken = _takenMap.get(queue);
- if (taken == null)
- {
- taken = new AtomicBoolean(false);
- }
- else
- {
- taken.set(false);
- }
-
- _deliveredToConsumer = false;
- _takenMap.put(queue, taken);
- _takenBySubcriptionMap.put(queue, null);
- }
- }
public boolean checkToken(Object token)
{
@@ -683,7 +596,6 @@ public class AMQMessage
*/
public boolean expired(AMQQueue queue) throws AMQException
{
- // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
if (_expiration != 0L)
{
@@ -732,7 +644,7 @@ public class AMQMessage
// Increment the references to this message for each queue delivery.
incrementReference();
// normal deliver so add this message at the end.
- _txnContext.deliver(this, q, false);
+ _txnContext.deliver(q.createEntry(this), false);
}
}
finally
@@ -743,175 +655,6 @@ public class AMQMessage
}
}
- /*
- public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
- {
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
-
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for (int i = 1; i < bodyCount; i++)
- {
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
- }
-
-
- }
-
-
- }
-
- public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
- {
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for (int i = 1; i < bodyCount; i++)
- {
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
- }
-
-
- }
-
-
- }
-
-
- private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
- deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
- getOkFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- getMessagePublishInfo().getExchange(),
- replyCode, replyText,
- getMessagePublishInfo().getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
- returnFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
- ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
- protocolSession.writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- protocolSession.writeFrame(bodyFrameIterator.next());
- }
- }
- */
public AMQMessageHandle getMessageHandle()
{
@@ -954,47 +697,7 @@ public class AMQMessage
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
// _taken + " by :" + _takenBySubcription;
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
- + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
- }
-
- public Subscription getDeliveredSubscription(AMQQueue queue)
- {
- // return _takenBySubcription;
- synchronized (this)
- {
- return _takenBySubcriptionMap.get(queue);
- }
- }
-
- public void reject(Subscription subscription)
- {
- if (subscription != null)
- {
- if (_rejectedBy == null)
- {
- _rejectedBy = new HashSet<Subscription>();
- }
-
- _rejectedBy.add(subscription);
- }
- else
- {
- _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
- }
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount;
}
- public boolean isRejectedBy(Subscription subscription)
- {
- boolean rejected = _rejectedBy != null;
-
- if (rejected) // We have subscriptions that rejected this message
- {
- return _rejectedBy.contains(subscription);
- }
- else // This messasge hasn't been rejected yet.
- {
- return rejected;
- }
- }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 0c52a358f7..53c36d9718 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class AMQQueue implements Managable, Comparable
{
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
@@ -250,7 +251,7 @@ public class AMQQueue implements Managable, Comparable
}
/** @return List of messages(undelivered) on the queue. */
- public List<AMQMessage> getMessagesOnTheQueue()
+ public List<QueueEntry> getMessagesOnTheQueue()
{
return _deliveryMgr.getMessages();
}
@@ -263,7 +264,7 @@ public class AMQQueue implements Managable, Comparable
*
* @return List of messages
*/
- public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+ public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
{
return _deliveryMgr.getMessages(fromMessageId, toMessageId);
}
@@ -276,11 +277,11 @@ public class AMQQueue implements Managable, Comparable
/**
* @param messageId
*
- * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
+ * @return QueueEntry with give id if exists. null if QueueEntry with given id doesn't exist.
*/
- public AMQMessage getMessageOnTheQueue(long messageId)
+ public QueueEntry getMessageOnTheQueue(long messageId)
{
- List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId);
+ List<QueueEntry> list = getMessagesOnTheQueue(messageId, messageId);
if ((list == null) || (list.size() == 0))
{
return null;
@@ -319,15 +320,16 @@ public class AMQQueue implements Managable, Comparable
toQueue.startMovingMessages();
// Get the list of messages to move.
- List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+ List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
try
{
fromStore.beginTran(storeContext);
// Move the messages in on the message store.
- for (AMQMessage message : foundMessagesList)
+ for (QueueEntry entry : foundMessagesList)
{
+ AMQMessage message = entry.getMessage();
fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
}
@@ -397,15 +399,16 @@ public class AMQQueue implements Managable, Comparable
toQueue.startMovingMessages();
// Get the list of messages to move.
- List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+ List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
try
{
fromStore.beginTran(storeContext);
// Move the messages in on the message store.
- for (AMQMessage message : foundMessagesList)
+ for (QueueEntry entry : foundMessagesList)
{
+ AMQMessage message = entry.getMessage();
toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
message.takeReference();
}
@@ -463,15 +466,16 @@ public class AMQQueue implements Managable, Comparable
startMovingMessages();
// Get the list of messages to move.
- List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+ List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
try
{
fromStore.beginTran(storeContext);
// remove the messages in on the message store.
- for (AMQMessage message : foundMessagesList)
+ for (QueueEntry entry : foundMessagesList)
{
+ AMQMessage message = entry.getMessage();
fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
}
@@ -513,7 +517,7 @@ public class AMQQueue implements Managable, Comparable
_deliveryMgr.startMovingMessages();
}
- private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList)
+ private void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> messageList)
{
_deliveryMgr.enqueueMovedMessages(storeContext, messageList);
_totalMessagesReceived.addAndGet(messageList.size());
@@ -583,7 +587,7 @@ public class AMQQueue implements Managable, Comparable
}
- /** Removes the AMQMessage from the top of the queue. */
+ /** Removes the QueueEntry from the top of the queue. */
public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext, this);
@@ -798,27 +802,28 @@ public class AMQQueue implements Managable, Comparable
// return _deliveryMgr;
// }
- public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
{
- _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
+ AMQMessage msg = entry.getMessage();
+ _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
- updateReceivedMessageCount(msg);
+ updateReceivedMessageCount(entry);
}
catch (NoConsumersException e)
{
// as this message will be returned, it should be removed
// from the queue:
- dequeue(storeContext, msg);
+ dequeue(storeContext, entry);
}
}
- public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+ public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
try
{
- msg.dequeue(storeContext, this);
+ entry.getMessage().dequeue(storeContext, this);
}
catch (MessageCleanupException e)
{
@@ -844,8 +849,10 @@ public class AMQQueue implements Managable, Comparable
return _subscribers;
}
- protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
+ protected void updateReceivedMessageCount(QueueEntry entry) throws AMQException
{
+ AMQMessage msg = entry.getMessage();
+
if (!msg.isRedelivered())
{
_totalMessagesReceived.incrementAndGet();
@@ -933,8 +940,14 @@ public class AMQQueue implements Managable, Comparable
_maximumMessageAge = maximumMessageAge;
}
- public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg)
+ public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
{
- _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+ _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, entry);
}
+
+ public QueueEntry createEntry(AMQMessage amqMessage)
+ {
+ return new QueueEntry(this, amqMessage);
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 07872d7644..9e32de3f76 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -321,11 +321,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
*/
public CompositeData viewMessageContent(long msgId) throws JMException
{
- AMQMessage msg = _queue.getMessageOnTheQueue(msgId);
- if (msg == null)
+ QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
+
+ if (entry == null)
{
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
+
+ AMQMessage msg = entry.getMessage();
// get message content
Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
List<Byte> msgContent = new ArrayList<Byte>();
@@ -381,7 +384,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
- List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+ List<QueueEntry> list = _queue.getMessagesOnTheQueue();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
try
@@ -389,7 +392,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
// Create the tabular list of message header contents
for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
- AMQMessage msg = list.get(i - 1);
+ AMQMessage msg = list.get(i - 1).getMessage();
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
String[] headerAttributes = getMessageHeaderProperties(headerBody);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index eabc8ebf38..3cf2cb9b12 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -55,7 +55,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
defaultValue = "false")
public boolean compressBufferOnQueue;
/** Holds any queued messages */
- private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+ private final MessageQueue<QueueEntry> _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
/** Ensures that only one asynchronous task is running for this manager at any time. */
private final AtomicBoolean _processing = new AtomicBoolean();
@@ -107,8 +107,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
- private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
+ private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst)
{
+ AMQMessage msg = entry.getMessage();
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
@@ -124,12 +125,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
synchronized (_queueHeadLock)
{
- _messages.pushHead(msg);
+ _messages.pushHead(entry);
}
}
else
{
- _messages.offer(msg);
+ _messages.offer(entry);
}
_totalMessageSize.addAndGet(msg.getSize());
@@ -175,11 +176,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public long getOldestMessageArrival()
{
- AMQMessage msg = _messages.peek();
- return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+ QueueEntry entry = _messages.peek();
+ return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
}
- public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+ public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry)
{
_lock.lock();
try
@@ -188,19 +189,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_log.debug("Queue has adding subscriber content");
_hasContent.add(subscription);
- _totalMessageSize.addAndGet(msg.getSize());
+ _totalMessageSize.addAndGet(entry.getSize());
_extraMessages.addAndGet(1);
}
else
{
_log.debug("Queue has removing subscriber content");
- if (msg == null)
+ if (entry == null)
{
_hasContent.remove(subscription);
}
else
{
- _totalMessageSize.addAndGet(-msg.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
_extraMessages.addAndGet(-1);
}
}
@@ -222,14 +223,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*
* @return List of messages
*/
- public List<AMQMessage> getMessages()
+ public List<QueueEntry> getMessages()
{
_lock.lock();
- List<AMQMessage> list = new ArrayList<AMQMessage>();
+ List<QueueEntry> list = new ArrayList<QueueEntry>();
- for (AMQMessage message : _messages)
+ for (QueueEntry entry : _messages)
{
- list.add(message);
+ list.add(entry);
}
_lock.unlock();
@@ -244,7 +245,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*
* @return
*/
- public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
+ public List<QueueEntry> getMessages(long fromMessageId, long toMessageId)
{
if (fromMessageId <= 0 || toMessageId <= 0)
{
@@ -255,14 +256,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
- List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+ List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
- for (AMQMessage message : _messages)
+ for (QueueEntry entry : _messages)
{
- long msgId = message.getMessageId();
+ long msgId = entry.getMessage().getMessageId();
if (msgId >= fromMessageId && msgId <= toMessageId)
{
- foundMessagesList.add(message);
+ foundMessagesList.add(entry);
}
// break if the no of messages are found
if (foundMessagesList.size() == maxMessageCount)
@@ -282,16 +283,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
}
- Iterator<AMQMessage> currentQueue = _messages.iterator();
+ Iterator<QueueEntry> currentQueue = _messages.iterator();
while (currentQueue.hasNext())
{
- AMQMessage message = currentQueue.next();
- if (!message.getDeliveredToConsumer())
+ QueueEntry entry = currentQueue.next();
+
+ if (!entry.getDeliveredToConsumer())
{
- if (subscription.hasInterest(message))
+ if (subscription.hasInterest(entry))
{
- subscription.enqueueForPreDelivery(message, false);
+ subscription.enqueueForPreDelivery(entry, false);
}
}
}
@@ -299,8 +301,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
{
- AMQMessage msg = getNextMessage();
- if (msg == null)
+ QueueEntry entry = getNextMessage();
+ if (entry == null)
{
return false;
}
@@ -322,9 +324,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ _log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
}
- _queue.dequeue(channel.getStoreContext(), msg);
+ _queue.dequeue(channel.getStoreContext(), entry);
}
synchronized (channel)
{
@@ -332,22 +334,22 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (acks)
{
- channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
- protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(),
+ protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
deliveryTag, _queue.getMessageCount());
- _totalMessageSize.addAndGet(-msg.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
}
if (!acks)
{
- msg.decrementReference(channel.getStoreContext());
+ entry.getMessage().decrementReference(channel.getStoreContext());
}
}
finally
{
- msg.setDeliveredToConsumer();
+ entry.setDeliveredToConsumer();
}
return true;
@@ -381,7 +383,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*
* @param messageList
*/
- public void removeMovedMessages(List<AMQMessage> messageList)
+ public void removeMovedMessages(List<QueueEntry> messageList)
{
// Remove from the
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -391,20 +393,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!sub.isSuspended() && sub.filtersMessages())
{
- Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
- for (AMQMessage msg : messageList)
+ Queue<QueueEntry> preDeliveryQueue = sub.getPreDeliveryQueue();
+ for (QueueEntry entry : messageList)
{
- preDeliveryQueue.remove(msg);
+ preDeliveryQueue.remove(entry);
}
}
}
}
- for (AMQMessage msg : messageList)
+ for (QueueEntry entry : messageList)
{
- if (_messages.remove(msg))
+ if (_messages.remove(entry))
{
- _totalMessageSize.getAndAdd(-msg.getSize());
+ _totalMessageSize.getAndAdd(-entry.getSize());
}
}
}
@@ -420,16 +422,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_lock.lock();
- AMQMessage message = _messages.poll();
+ QueueEntry entry = _messages.poll();
- if (message != null)
+ if (entry != null)
{
- queue.dequeue(storeContext, message);
+ queue.dequeue(storeContext, entry);
- _totalMessageSize.addAndGet(-message.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
//If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
- message.decrementReference(storeContext);
+ entry.getMessage().decrementReference(storeContext);
}
@@ -443,17 +445,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
synchronized (_queueHeadLock)
{
- AMQMessage msg = getNextMessage();
- while (msg != null)
+ QueueEntry entry = getNextMessage();
+ while (entry != null)
{
//and remove it
_messages.poll();
- _queue.dequeue(storeContext, msg);
+ _queue.dequeue(storeContext, entry);
- msg.decrementReference(_reapingStoreContext);
+ entry.getMessage().decrementReference(_reapingStoreContext);
- msg = getNextMessage();
+ entry = getNextMessage();
count++;
}
_totalMessageSize.set(0L);
@@ -469,34 +471,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*
* @throws org.apache.qpid.AMQException
*/
- private AMQMessage getNextMessage() throws AMQException
+ private QueueEntry getNextMessage() throws AMQException
{
return getNextMessage(_messages, null, false);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub, boolean purgeOnly) throws AMQException
+ private QueueEntry getNextMessage(Queue<QueueEntry> messages, Subscription sub, boolean purgeOnly) throws AMQException
{
- AMQMessage message = messages.peek();
+ QueueEntry entry = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (purgeMessage(message, sub, purgeOnly))
+ while (purgeMessage(entry, sub, purgeOnly))
{
+ AMQMessage message = entry.getMessage();
// if we are purging then ensure we mark this message taken for the current subscriber
// the current subscriber may be null in the case of a get or a purge but this is ok.
// boolean alreadyTaken = message.taken(_queue, sub);
//remove the already taken message or expired
- AMQMessage removed = messages.poll();
+ QueueEntry removed = messages.poll();
- assert removed == message;
+ assert removed == entry;
// if the message expired then the _totalMessageSize needs adjusting
- if (message.expired(_queue) && !message.getDeliveredToConsumer())
+ if (message.expired(_queue) && !entry.getDeliveredToConsumer())
{
- _totalMessageSize.addAndGet(-message.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- _queue.dequeue(_reapingStoreContext, message);
+ _queue.dequeue(_reapingStoreContext, entry);
message.decrementReference(_reapingStoreContext);
@@ -515,10 +518,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
// try the next message
- message = messages.peek();
+ entry = messages.peek();
}
- return message;
+ return entry;
}
/**
@@ -534,7 +537,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*
* @throws AMQException
*/
- private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+ private boolean purgeMessage(QueueEntry message, Subscription sub) throws AMQException
{
return purgeMessage(message, sub, false);
}
@@ -552,7 +555,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*
* @throws AMQException
*/
- private boolean purgeMessage(AMQMessage message, Subscription sub, boolean purgeOnly) throws AMQException
+ private boolean purgeMessage(QueueEntry message, Subscription sub, boolean purgeOnly) throws AMQException
{
//Original.. complicated while loop control
// (message != null
@@ -567,7 +570,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (message != null)
{
// Check that the message hasn't expired.
- if (message.expired(_queue))
+ if (message.expired())
{
return true;
}
@@ -576,13 +579,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (sub != null)
{
// if we have a queue browser(we don't purge) so check mark the message as taken
- purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+ purge = ((!sub.isBrowser() || message.isTaken()));
}
else
{
// if there is no subscription we are doing
// a get or purging so mark message as taken.
- message.isTaken(_queue);
+ message.isTaken();
// and then ensure that it gets purged
purge = true;
}
@@ -592,20 +595,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
// If we are simply purging the queue don't take the message
// just purge up to the next non-taken msg.
- return purge && message.isTaken(_queue);
+ return purge && message.isTaken();
}
else
{
// if we are purging then ensure we mark this message taken for the current subscriber
// the current subscriber may be null in the case of a get or a purge but this is ok.
- return purge && message.taken(_queue, sub);
+ return purge && message.taken(sub);
}
}
- public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)
{
- Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+ Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
if (_log.isTraceEnabled())
{
@@ -624,16 +627,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return;
}
- AMQMessage message = null;
- AMQMessage removed = null;
+ QueueEntry entry = null;
+ QueueEntry removed = null;
try
{
synchronized (_queueHeadLock)
{
- message = getNextMessage(messageQueue, sub, false);
+ entry = getNextMessage(messageQueue, sub, false);
// message will be null if we have no messages in the messageQueue.
- if (message == null)
+ if (entry == null)
{
if (_log.isTraceEnabled())
{
@@ -643,7 +646,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) +
+ _log.debug(debugIdentity() + "Async Delivery Message :" + entry + "(" + System.identityHashCode(entry) +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -651,10 +654,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (messageQueue == _messages)
{
- _totalMessageSize.addAndGet(-message.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
}
- sub.send(message, _queue);
+ sub.send(entry, _queue);
//remove sent message from our queue.
removed = messageQueue.poll();
@@ -662,14 +665,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Otherwise the Async send will never end
}
- if (removed != message)
+ if (removed != entry)
{
- _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed);
+ _log.error("Just send message:" + entry.getMessage().debugIdentity() + " BUT removed this from queue:" + removed);
}
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message +
+ _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.getMessage().debugIdentity() + "d:" + entry +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -704,9 +707,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
catch (AMQException e)
{
- if (message != null)
+ if (entry != null)
{
- message.release(_queue);
+ entry.release();
}
else
{
@@ -734,23 +737,23 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
* @param storeContext
* @param movedMessageList
*/
- public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList)
+ public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
{
_lock.lock();
- for (AMQMessage msg : movedMessageList)
+ for (QueueEntry entry : movedMessageList)
{
- addMessageToQueue(msg, false);
+ addMessageToQueue(entry, false);
}
// enqueue on the pre delivery queues
for (Subscription sub : _subscriptions.getSubscriptions())
{
- for (AMQMessage msg : movedMessageList)
+ for (QueueEntry entry : movedMessageList)
{
// Only give the message to those that want them.
- if (sub.hasInterest(msg))
+ if (sub.hasInterest(entry))
{
- sub.enqueueForPreDelivery(msg, true);
+ sub.enqueueForPreDelivery(entry, true);
}
}
}
@@ -802,30 +805,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
- public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void deliver(StoreContext context, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws AMQException
{
final boolean debugEnabled = _log.isDebugEnabled();
if (debugEnabled)
{
- _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
+ _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + entry);
}
//Check if we have someone to deliver the message to.
_lock.lock();
try
{
- Subscription s = _subscriptions.nextSubscriber(msg);
+ Subscription s = _subscriptions.nextSubscriber(entry);
if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
+ _log.debug(debugIdentity() + "Testing Message(" + entry + ") for Queued Delivery:" + currentStatus());
}
- if (!msg.getMessagePublishInfo().isImmediate())
+ if (!entry.getMessage().getMessagePublishInfo().isImmediate())
{
- addMessageToQueue(msg, deliverFirst);
+ addMessageToQueue(entry, deliverFirst);
//release lock now message is on queue.
_lock.unlock();
@@ -840,25 +843,25 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
- if (_queue.isShared() && msg.getDeliveredToConsumer())
+ if (_queue.isShared() && entry.getDeliveredToConsumer())
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) +
") is already delivered.");
}
continue;
}
// Only give the message to those that want them.
- if (sub.hasInterest(msg))
+ if (sub.hasInterest(entry))
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
- sub.enqueueForPreDelivery(msg, deliverFirst);
+ sub.enqueueForPreDelivery(entry, deliverFirst);
}
}
@@ -893,11 +896,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isTraceEnabled())
{
- _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
+ _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- if (msg.taken(_queue, s))
+ if (entry.taken(s))
{
//Message has been delivered so don't redeliver.
// This can currently occur because of the recursive call below
@@ -927,14 +930,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return;
}
//Deliver the message
- s.send(msg, _queue);
+ s.send(entry, _queue);
}
else
{
if (debugEnabled)
{
_log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
- "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+ "suspended between nextSubscriber and send for message:" + entry.getMessage().debugIdentity());
}
}
}
@@ -943,21 +946,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Why do we do this? What was the reasoning? We should have a better approach
// than recursion and rejecting if someone else sends it before we do.
//
- if (!msg.isTaken(_queue))
+ if (!entry.isTaken())
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+ _log.debug(debugIdentity() + " Message(" + entry.getMessage().debugIdentity() + ") has not been taken so recursing!:" +
" Subscriber:" + System.identityHashCode(s));
}
- deliver(context, name, msg, deliverFirst);
+ deliver(context, name, entry, deliverFirst);
}
else
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + " Message(" + msg.toString() +
+ _log.debug(debugIdentity() + " Message(" + entry.toString() +
") has been taken so disregarding deliver request to Subscriber:" +
System.identityHashCode(s));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index 153106d919..f7f35a9319 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -64,13 +64,13 @@ interface DeliveryManager
*
* @param storeContext
* @param name the name of the entity on whose behalf we are delivering the message
- * @param msg the message to deliver
+ * @param entry the message to deliver
* @param deliverFirst
*
* @throws org.apache.qpid.server.queue.FailedDequeueException
* if the message could not be dequeued
*/
- void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
+ void deliver(StoreContext storeContext, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws FailedDequeueException, AMQException;
void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException;
@@ -78,15 +78,15 @@ interface DeliveryManager
void startMovingMessages();
- void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList);
+ void enqueueMovedMessages(StoreContext context, List<QueueEntry> messageList);
void stopMovingMessages();
- void removeMovedMessages(List<AMQMessage> messageListToRemove);
+ void removeMovedMessages(List<QueueEntry> messageListToRemove);
- List<AMQMessage> getMessages();
+ List<QueueEntry> getMessages();
- List<AMQMessage> getMessages(long fromMessageId, long toMessageId);
+ List<QueueEntry> getMessages(long fromMessageId, long toMessageId);
void populatePreDeliveryQueue(Subscription subscription);
@@ -96,5 +96,5 @@ interface DeliveryManager
long getOldestMessageArrival();
- void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg);
+ void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
new file mode 100644
index 0000000000..69595f1d6c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.log4j.Logger;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class QueueEntry
+{
+
+ /**
+ * Used for debugging purposes.
+ */
+ private static final Logger _log = Logger.getLogger(QueueEntry.class);
+
+ private final AMQQueue _queue;
+ private final AMQMessage _message;
+
+ private Set<Subscription> _rejectedBy = null;
+
+ private AtomicReference<Object> _owner = new AtomicReference<Object>();
+
+
+ public QueueEntry(AMQQueue queue, AMQMessage message)
+ {
+ _queue = queue;
+ _message = message;
+ }
+
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public AMQMessage getMessage()
+ {
+ return _message;
+ }
+
+ public long getSize()
+ {
+ return getMessage().getSize();
+ }
+
+ public boolean getDeliveredToConsumer()
+ {
+ return getMessage().getDeliveredToConsumer();
+ }
+
+ public boolean expired() throws AMQException
+ {
+ return getMessage().expired(_queue);
+ }
+
+ public boolean isTaken()
+ {
+ return _owner.get() != null;
+ }
+
+ public boolean taken(Subscription sub)
+ {
+ return !(_owner.compareAndSet(null, sub == null ? this : sub));
+ }
+
+ public void setDeliveredToConsumer()
+ {
+ getMessage().setDeliveredToConsumer();
+ }
+
+ public void release()
+ {
+ _owner.set(null);
+ }
+
+ public String debugIdentity()
+ {
+ return getMessage().debugIdentity();
+ }
+
+ public void process(StoreContext storeContext, boolean deliverFirst) throws AMQException
+ {
+ _queue.process(storeContext, this, deliverFirst);
+ }
+
+ public void checkDeliveredToConsumer() throws NoConsumersException
+ {
+ _message.checkDeliveredToConsumer();
+ }
+
+ public void setRedelivered(boolean b)
+ {
+ getMessage().setRedelivered(true);
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ synchronized (this)
+ {
+ Object owner = _owner.get();
+ if (owner instanceof Subscription)
+ {
+ return (Subscription) owner;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ public void reject()
+ {
+ reject(getDeliveredSubscription());
+ }
+
+ public void reject(Subscription subscription)
+ {
+ if (subscription != null)
+ {
+ if (_rejectedBy == null)
+ {
+ _rejectedBy = new HashSet<Subscription>();
+ }
+
+ _rejectedBy.add(subscription);
+ }
+ else
+ {
+ _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ }
+ }
+
+ public boolean isRejectedBy(Subscription subscription)
+ {
+ boolean rejected = _rejectedBy != null;
+
+ if (rejected) // We have subscriptions that rejected this message
+ {
+ return _rejectedBy.contains(subscription);
+ }
+ else // This messasge hasn't been rejected yet.
+ {
+ return rejected;
+ }
+ }
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index 77688f19be..a706098b71 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.AMQChannel;
public interface Subscription
{
- void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+ void send(QueueEntry msg, AMQQueue queue) throws AMQException;
boolean isSuspended();
@@ -35,15 +35,15 @@ public interface Subscription
boolean filtersMessages();
- boolean hasInterest(AMQMessage msg);
+ boolean hasInterest(QueueEntry msg);
- Queue<AMQMessage> getPreDeliveryQueue();
+ Queue<QueueEntry> getPreDeliveryQueue();
- Queue<AMQMessage> getResendQueue();
+ Queue<QueueEntry> getResendQueue();
- Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
+ Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages);
- void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst);
+ void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
boolean isAutoClose();
@@ -53,9 +53,9 @@ public interface Subscription
boolean isBrowser();
- boolean wouldSuspend(AMQMessage msg);
+ boolean wouldSuspend(QueueEntry msg);
- void addToResendQueue(AMQMessage msg);
+ void addToResendQueue(QueueEntry msg);
Object getSendLock();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 1299c3a80c..e631481cc8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -58,9 +58,9 @@ public class SubscriptionImpl implements Subscription
private final Object _sessionKey;
- private MessageQueue<AMQMessage> _messages;
+ private MessageQueue<QueueEntry> _messages;
- private Queue<AMQMessage> _resendQueue;
+ private Queue<QueueEntry> _resendQueue;
private final boolean _noLocal;
@@ -160,7 +160,7 @@ public class SubscriptionImpl implements Subscription
if (filtersMessages())
{
- _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+ _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
}
else
{
@@ -226,7 +226,7 @@ public class SubscriptionImpl implements Subscription
*
* @throws AMQException
*/
- public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+ public void send(QueueEntry msg, AMQQueue queue) throws AMQException
{
if (msg != null)
{
@@ -245,7 +245,7 @@ public class SubscriptionImpl implements Subscription
}
}
- private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
+ private void sendToBrowser(QueueEntry msg, AMQQueue queue) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -266,11 +266,11 @@ public class SubscriptionImpl implements Subscription
_logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
}
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
}
}
- private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+ private void sendToConsumer(StoreContext storeContext, QueueEntry entry, AMQQueue queue)
throws AMQException
{
try
@@ -287,9 +287,9 @@ public class SubscriptionImpl implements Subscription
{
if (_logger.isDebugEnabled())
{
- _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
}
- queue.dequeue(storeContext, msg);
+ queue.dequeue(storeContext, entry);
}
synchronized (channel)
@@ -298,19 +298,19 @@ public class SubscriptionImpl implements Subscription
if (_sendLock.get())
{
- _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
}
if (_acks)
{
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag);
}
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
if (!_acks)
{
- msg.decrementReference(storeContext);
+ entry.getMessage().decrementReference(storeContext);
}
}
}
@@ -320,7 +320,7 @@ public class SubscriptionImpl implements Subscription
// using a try->finally would set it even if an error occured.
// Is this what we want?
- msg.setDeliveredToConsumer();
+ entry.setDeliveredToConsumer();
}
}
@@ -355,19 +355,19 @@ public class SubscriptionImpl implements Subscription
return _filters != null || _noLocal;
}
- public boolean hasInterest(AMQMessage msg)
+ public boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
- if (msg.isRejectedBy(this))
+ if (entry.isRejectedBy(this))
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity());
+ _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
}
// return false;
}
- final AMQProtocolSession publisher = msg.getPublisher();
+ final AMQProtocolSession publisher = entry.getMessage().getPublisher();
//todo - client id should be recoreded and this test removed but handled below
if (_noLocal && publisher != null)
@@ -418,9 +418,9 @@ public class SubscriptionImpl implements Subscription
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity());
+ _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
}
- return checkFilters(msg);
+ return checkFilters(entry);
}
@@ -431,7 +431,7 @@ public class SubscriptionImpl implements Subscription
return id;
}
- private boolean checkFilters(AMQMessage msg)
+ private boolean checkFilters(QueueEntry msg)
{
if (_filters != null)
{
@@ -439,7 +439,7 @@ public class SubscriptionImpl implements Subscription
// {
// _logger.trace("(" + debugIdentity() + ") has filters.");
// }
- return _filters.allAllow(msg);
+ return _filters.allAllow(msg.getMessage());
}
else
{
@@ -452,12 +452,12 @@ public class SubscriptionImpl implements Subscription
}
}
- public Queue<AMQMessage> getPreDeliveryQueue()
+ public Queue<QueueEntry> getPreDeliveryQueue()
{
return _messages;
}
- public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+ public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
{
if (_messages != null)
{
@@ -561,19 +561,19 @@ public class SubscriptionImpl implements Subscription
while (!_resendQueue.isEmpty())
{
- AMQMessage resent = _resendQueue.poll();
+ QueueEntry resent = _resendQueue.poll();
if (_logger.isTraceEnabled())
{
_logger.trace("Removed for resending:" + resent.debugIdentity());
}
- resent.release(_queue);
+ resent.release();
_queue.subscriberHasPendingResend(false, this, resent);
try
{
- channel.getTransactionalContext().deliver(resent, _queue, true);
+ channel.getTransactionalContext().deliver(resent, true);
}
catch (AMQException e)
{
@@ -611,22 +611,22 @@ public class SubscriptionImpl implements Subscription
return _isBrowser;
}
- public boolean wouldSuspend(AMQMessage msg)
+ public boolean wouldSuspend(QueueEntry msg)
{
- return channel.wouldSuspend(msg);
+ return channel.wouldSuspend(msg.getMessage());
}
- public Queue<AMQMessage> getResendQueue()
+ public Queue<QueueEntry> getResendQueue()
{
if (_resendQueue == null)
{
- _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ _resendQueue = new ConcurrentLinkedQueueAtomicSize<QueueEntry>();
}
return _resendQueue;
}
- public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
{
if (_resendQueue != null && !_resendQueue.isEmpty())
{
@@ -651,7 +651,7 @@ public class SubscriptionImpl implements Subscription
}
}
- public void addToResendQueue(AMQMessage msg)
+ public void addToResendQueue(QueueEntry msg)
{
// add to our resend queue
getResendQueue().add(msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
index 4df88baebc..bc17bcca9c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
@@ -30,5 +30,5 @@ public interface SubscriptionManager
{
public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
- public Subscription nextSubscriber(AMQMessage msg);
+ public Subscription nextSubscriber(QueueEntry entry);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index b500247fa4..b73b8d7e07 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -113,7 +113,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
* concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
* nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
*/
- public Subscription nextSubscriber(AMQMessage msg)
+ public Subscription nextSubscriber(QueueEntry msg)
{
if (_subscriptions.isEmpty())
{
@@ -140,7 +140,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
}
- private Subscription nextSubscriberImpl(AMQMessage msg)
+ private Subscription nextSubscriberImpl(QueueEntry msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
while (iterator.hasNext())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 9068f871cb..b12afd9a41 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -64,14 +65,13 @@ public class LocalTransactionalContext implements TransactionalContext
private static class DeliveryDetails
{
- public AMQMessage message;
- public AMQQueue queue;
+ public QueueEntry entry;
+
private boolean deliverFirst;
- public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
+ public DeliveryDetails(QueueEntry entry, boolean deliverFirst)
{
- this.message = message;
- this.queue = queue;
+ this.entry = entry;
this.deliverFirst = deliverFirst;
}
}
@@ -103,7 +103,7 @@ public class LocalTransactionalContext implements TransactionalContext
_postCommitDeliveryList.clear();
}
- public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+ public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
{
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
@@ -112,9 +112,9 @@ public class LocalTransactionalContext implements TransactionalContext
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
// message.incrementReference();
- _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
+ _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst));
_messageDelivered = true;
- _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+ _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages));
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
if (_log.isDebugEnabled())
{
@@ -242,11 +242,11 @@ public class LocalTransactionalContext implements TransactionalContext
{
for (DeliveryDetails dd : _postCommitDeliveryList)
{
- dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
+ dd.entry.process(_storeContext, dd.deliverFirst);
try
{
- dd.message.checkDeliveredToConsumer();
+ dd.entry.checkDeliveredToConsumer();
}
catch (NoConsumersException nce)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index b3d69543d4..047cef9064 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -92,14 +93,14 @@ public class NonTransactionalContext implements TransactionalContext
// Does not apply to this context
}
- public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+ public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
{
try
{
- queue.process(_storeContext, message, deliverFirst);
+ entry.process(_storeContext, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
- message.checkDeliveredToConsumer();
+ entry.checkDeliveredToConsumer();
}
catch (NoConsumersException e)
{
@@ -128,7 +129,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + message.message.getMessageId());
+ _log.debug("Discarding message: " + message.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -162,7 +163,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + msg.message.getMessageId());
+ _log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -192,7 +193,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + msg.message.getMessageId());
+ _log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -206,7 +207,7 @@ public class NonTransactionalContext implements TransactionalContext
if (_log.isDebugEnabled())
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.message.getMessageId());
+ msg.getMessage().getMessageId());
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
index fee25c07df..b4c66aa24d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
/**
@@ -111,14 +112,13 @@ public interface TransactionalContext
*
* <p/>This is an 'enqueue' operation.
*
- * @param message The message to deliver.
- * @param queue The queue to deliver the message to.
+ * @param entry The message to deliver, and the queue to deliver to.
* @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
* for normal FIFO message ordering.
*
* @throws AMQException If the message cannot be delivered for any reason.
*/
- void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
+ void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException;
/**
* Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
index eea53252c6..218d5f04ed 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
@@ -24,6 +24,7 @@ import org.apache.commons.codec.binary.Hex;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -85,7 +86,7 @@ public class Dump extends Show
}
- protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
boolean showMessageHeaders)
{
@@ -96,8 +97,9 @@ public class Dump extends Show
display.add(hex);
display.add(ascii);
- for (AMQMessage msg : messages)
+ for (QueueEntry entry : messages)
{
+ AMQMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
@@ -252,8 +254,8 @@ public class Dump extends Show
private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
String title, boolean routing, boolean headers, boolean messageHeaders)
{
- List<AMQMessage> single = new LinkedList<AMQMessage>();
- single.add(msg);
+ List<QueueEntry> single = new LinkedList<QueueEntry>();
+ single.add(new QueueEntry(null,msg));
List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
index 25cff27445..7e21253fab 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
@@ -23,6 +23,7 @@ package org.apache.qpid.tools.messagestore.commands;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
@@ -166,12 +167,12 @@ public class Move extends AbstractCommand
if (fromQueue != null)
{
- List<AMQMessage> messages = fromQueue.getMessagesOnTheQueue();
+ List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue();
if (messages != null)
{
- for (AMQMessage msg : messages)
+ for (QueueEntry msg : messages)
{
- ids.add(msg.getMessageId());
+ ids.add(msg.getMessage().getMessageId());
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index 5988cdabfc..a6dccf0f36 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -114,7 +115,7 @@ public class Show extends AbstractCommand
if (_queue != null)
{
- List<AMQMessage> messages = _queue.getMessagesOnTheQueue();
+ List<QueueEntry> messages = _queue.getMessagesOnTheQueue();
if (messages == null || messages.size() == 0)
{
_console.println("No messages on queue");
@@ -153,7 +154,7 @@ public class Show extends AbstractCommand
* @param showMessageHeaders show the msg headers be shown
* @return the formated data lists for printing
*/
- protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
boolean showMessageHeaders)
{
@@ -334,8 +335,9 @@ public class Show extends AbstractCommand
}
//Add create the table of data
- for (AMQMessage msg : messages)
+ for (QueueEntry entry : messages)
{
+ AMQMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index 0a3bc93763..8e5879a51e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -104,7 +104,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -146,7 +146,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -166,7 +166,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -207,7 +207,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -227,7 +227,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -247,7 +247,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -266,7 +266,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -308,7 +308,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -327,7 +327,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -369,7 +369,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -403,7 +403,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -446,7 +446,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -487,7 +487,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index b718b9c861..81b0ae2213 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -286,7 +286,7 @@ public class AMQQueueAlertTest extends TestCase
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i], false);
+ _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false);
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 4770779363..d86c90bdae 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -211,7 +211,7 @@ public class AMQQueueMBeanTest extends TestCase
msg.enqueue(_queue);
msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- _queue.process(_storeContext, msg, false);
+ _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
_queueMBean.viewMessageContent(id);
try
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 3ee8277eba..10189a8017 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -29,6 +29,7 @@ import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -133,7 +134,7 @@ public class TxAckTest extends TestCase
};
TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
+ _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag));
}
_acked = acked;
_unacked = unacked;
@@ -150,7 +151,7 @@ public class TxAckTest extends TestCase
{
UnacknowledgedMessage u = _map.get(tag);
assertTrue("Message not found for tag " + tag, u != null);
- ((TestMessage) u.message).assertCountEquals(expected);
+ ((TestMessage) u.getMessage()).assertCountEquals(expected);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index ff5517bdd5..58323086b5 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -250,9 +251,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* @param deliverFirst
* @throws AMQException
*/
- public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
{
- messages.add(new HeadersExchangeTest.Message(msg));
+ messages.add(new HeadersExchangeTest.Message(msg.getMessage()));
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index be788a02da..790607e268 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -142,7 +142,7 @@ public class AckTest extends TestCase
msg.incrementReference();
msg.routingComplete(_messageStore, _storeContext, factory);
// we manually send the message to the subscription
- _subscription.send(msg, _queue);
+ _subscription.send(new QueueEntry(_queue,msg), _queue);
}
}
@@ -167,7 +167,7 @@ public class AckTest extends TestCase
assertTrue(deliveryTag == i);
i++;
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
}
assertTrue(map.size() == msgCount);
@@ -228,7 +228,7 @@ public class AckTest extends TestCase
{
assertTrue(deliveryTag == i);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
{
@@ -257,7 +257,7 @@ public class AckTest extends TestCase
{
assertTrue(deliveryTag == i + 5);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
}
@@ -281,7 +281,7 @@ public class AckTest extends TestCase
{
assertTrue(deliveryTag == i + 5);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
index 068f37574d..282ad3ed5e 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
@@ -42,9 +42,9 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
private final Set<Subscription> _active = new HashSet<Subscription>();
- private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
+ private final List<QueueEntry> _messages = new ArrayList<QueueEntry>();
private int next = 0;//index to next message to send
- private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
+ private final List<QueueEntry> _received = Collections.synchronizedList(new ArrayList<QueueEntry>());
private final Executor _executor = new OnCurrentThreadExecutor();
private final List<Thread> _threads = new ArrayList<Thread>();
@@ -159,7 +159,7 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
}
}
- private AMQMessage nextMessage()
+ private QueueEntry nextMessage()
{
synchronized (_messages)
{
@@ -191,7 +191,7 @@ public class ConcurrencyTestDisabled extends MessageTestHelper
{
void doRun() throws Throwable
{
- AMQMessage msg = nextMessage();
+ QueueEntry msg = nextMessage();
if (msg != null)
{
_deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
index dc5a6d3cf6..b33259cfba 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -40,7 +40,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
public void testStartInQueueingMode() throws AMQException
{
- AMQMessage[] messages = new AMQMessage[10];
+ QueueEntry[] messages = new QueueEntry[10];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message();
@@ -85,7 +85,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
public void testStartInDirectMode() throws AMQException
{
- AMQMessage[] messages = new AMQMessage[10];
+ QueueEntry[] messages = new QueueEntry[10];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message();
@@ -132,7 +132,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
{
try
{
- AMQMessage msg = message(true);
+ QueueEntry msg = message(true);
_mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
@@ -154,7 +154,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
SubscriptionTestHelper s = new SubscriptionTestHelper("A");
_subscriptions.addSubscriber(s);
s.setSuspended(true);
- AMQMessage msg = message(true);
+ QueueEntry msg = message(true);
_mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
index 88272023e8..812aec6a5d 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -55,12 +55,12 @@ class MessageTestHelper extends TestCase
ApplicationRegistry.initialise(new NullApplicationRegistry());
}
- AMQMessage message() throws AMQException
+ QueueEntry message() throws AMQException
{
return message(false);
}
- AMQMessage message(final boolean immediate) throws AMQException
+ QueueEntry message(final boolean immediate) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -86,8 +86,8 @@ class MessageTestHelper extends TestCase
}
};
- return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
- new ContentHeaderBody());
+ return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
+ new ContentHeaderBody()));
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index fe947ef3bc..5846ad0a9d 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -28,13 +28,13 @@ import java.util.Queue;
public class SubscriptionTestHelper implements Subscription
{
- private final List<AMQMessage> messages;
+ private final List<QueueEntry> messages;
private final Object key;
private boolean isSuspended;
public SubscriptionTestHelper(Object key)
{
- this(key, new ArrayList<AMQMessage>());
+ this(key, new ArrayList<QueueEntry>());
}
public SubscriptionTestHelper(final Object key, final boolean isSuspended)
@@ -43,18 +43,18 @@ public class SubscriptionTestHelper implements Subscription
setSuspended(isSuspended);
}
- SubscriptionTestHelper(Object key, List<AMQMessage> messages)
+ SubscriptionTestHelper(Object key, List<QueueEntry> messages)
{
this.key = key;
this.messages = messages;
}
- List<AMQMessage> getMessages()
+ List<QueueEntry> getMessages()
{
return messages;
}
- public void send(AMQMessage msg, AMQQueue queue)
+ public void send(QueueEntry msg, AMQQueue queue)
{
messages.add(msg);
}
@@ -69,12 +69,12 @@ public class SubscriptionTestHelper implements Subscription
return isSuspended;
}
- public boolean wouldSuspend(AMQMessage msg)
+ public boolean wouldSuspend(QueueEntry msg)
{
return isSuspended;
}
- public void addToResendQueue(AMQMessage msg)
+ public void addToResendQueue(QueueEntry msg)
{
//no-op
}
@@ -98,27 +98,27 @@ public class SubscriptionTestHelper implements Subscription
return false;
}
- public boolean hasInterest(AMQMessage msg)
+ public boolean hasInterest(QueueEntry msg)
{
return true;
}
- public Queue<AMQMessage> getPreDeliveryQueue()
+ public Queue<QueueEntry> getPreDeliveryQueue()
{
return null;
}
- public Queue<AMQMessage> getResendQueue()
+ public Queue<QueueEntry> getResendQueue()
{
return null;
}
- public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
{
return messages;
}
- public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+ public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
{
//no-op
}