diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-12-18 23:00:40 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-12-18 23:00:40 +0000 |
commit | 8a916973a6a6209d49af5363b10d3b29af2b151f (patch) | |
tree | 6f909e235183684d703e3b4d9ff22afe53297474 | |
parent | b9857867db0ea728abb837027fc164f8d01b1191 (diff) | |
download | qpid-python-8a916973a6a6209d49af5363b10d3b29af2b151f.tar.gz |
QPID-711 : create a QueueEntry class and move message-on-queue functions (such as taken()) to this class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@605352 13f79535-47bb-0310-9956-ffa450edef68
32 files changed, 545 insertions, 623 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 0907ea9df0..9fb3a5040b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -36,10 +36,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.Subscription; +import org.apache.qpid.server.queue.*; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; @@ -421,33 +418,32 @@ public class AMQChannel /** * Add a message to the channel-based list of unacknowledged messages * - * @param message the message that was delivered + * @param entry the record of the message on the queue that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the * delivery tag) * @param consumerTag The tag for the consumer that is to acknowledge this message. - * @param queue the queue from which the message was delivered */ - public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) + public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, AMQShortString consumerTag) { if (_log.isDebugEnabled()) { - if (queue == null) + if (entry.getQueue() == null) { - _log.debug("Adding unacked message with a null queue:" + message.debugIdentity()); + _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity()); } else { if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag - + ") with a queue(" + queue + ") for " + consumerTag); + _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + + ") with a queue(" + entry.getQueue() + ") for " + consumerTag); } } } synchronized (_unacknowledgedMessageMap.getLock()) { - _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); + _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag)); checkSuspension(); } } @@ -499,16 +495,16 @@ public class AMQChannel for (UnacknowledgedMessage unacked : messagesToBeDelivered) { - if (unacked.queue != null) + if (!unacked.isQueueDeleted()) { // Ensure message is released for redelivery - unacked.message.release(unacked.queue); + unacked.entry.release(); // Mark message redelivered - unacked.message.setRedelivered(true); + unacked.getMessage().setRedelivered(true); // Deliver Message - deliveryContext.deliver(unacked.message, unacked.queue, false); + deliveryContext.deliver(unacked.entry, false); // Should we allow access To the DM to directy deliver the message? // As we don't need to check for Consumers or worry about incrementing the message count? @@ -533,13 +529,13 @@ public class AMQChannel { // Ensure message is released for redelivery - if (unacked.queue != null) + if (!unacked.isQueueDeleted()) { - unacked.message.release(unacked.queue); + unacked.entry.release(); } // Mark message redelivered - unacked.message.setRedelivered(true); + unacked.getMessage().setRedelivered(true); // Deliver these messages out of the transaction as their delivery was never // part of the transaction only the receive. @@ -559,16 +555,16 @@ public class AMQChannel deliveryContext = _txnContext; } - if (unacked.queue != null) + if (!unacked.isQueueDeleted()) { // Redeliver the messages to the front of the queue - deliveryContext.deliver(unacked.message, unacked.queue, true); + deliveryContext.deliver(unacked.entry, true); // Deliver increments the message count but we have already deliverted this once so don't increment it again // this was because deliver did an increment changed this. } else { - _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); // _log.error("Requested requeue of message:" + deliveryTag + // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); @@ -591,7 +587,7 @@ public class AMQChannel public boolean callback(UnacknowledgedMessage message) throws AMQException { _log.debug( - (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]"); + (count++) + ": (" + message.getMessage().debugIdentity() + ")" + "[" + message.deliveryTag + "]"); return false; // Continue } @@ -630,7 +626,7 @@ public class AMQChannel public boolean callback(UnacknowledgedMessage message) throws AMQException { AMQShortString consumerTag = message.consumerTag; - AMQMessage msg = message.message; + AMQMessage msg = message.getMessage(); msg.setRedelivered(true); if (consumerTag != null) { @@ -649,7 +645,7 @@ public class AMQChannel // Message has no consumer tag, so was "delivered" to a GET // or consumer no longer registered // cannot resend, so re-queue. - if (message.queue != null) + if (!message.isQueueDeleted()) { if (requeue) { @@ -690,7 +686,7 @@ public class AMQChannel for (UnacknowledgedMessage message : msgToResend) { - AMQMessage msg = message.message; + AMQMessage msg = message.getMessage(); // Our Java Client will always suspend the channel when resending! // If the client has requested the messages be resent then it is @@ -705,13 +701,13 @@ public class AMQChannel // else // { // release to allow it to be delivered - msg.release(message.queue); + message.entry.release(); // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(message.queue); + Subscription sub = message.entry.getDeliveredSubscription(); if (sub != null) { @@ -741,7 +737,7 @@ public class AMQChannel + System.identityHashCode(sub)); } - sub.addToResendQueue(msg); + sub.addToResendQueue(message.entry); _unacknowledgedMessageMap.remove(message.deliveryTag); } } // sync(sub.getSendLock) @@ -789,10 +785,10 @@ public class AMQChannel // Process Messages to Requeue at the front of the queue for (UnacknowledgedMessage message : msgToRequeue) { - message.message.release(message.queue); - message.message.setRedelivered(true); + message.entry.release(); + message.entry.setRedelivered(true); - deliveryContext.deliver(message.message, message.queue, true); + deliveryContext.deliver(message.entry, true); _unacknowledgedMessageMap.remove(message.deliveryTag); } @@ -813,17 +809,18 @@ public class AMQChannel { public boolean callback(UnacknowledgedMessage message) throws AMQException { - if (message.queue == queue) + if (message.getQueue() == queue) { try { message.discard(_storeContext); - message.queue = null; + message.setQueueDeleted(true); + } catch (AMQException e) { _log.error( - "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e); + "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 5ca8d57f7c..ac29998c2a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -87,7 +87,7 @@ public class TxAck implements TxnOp //buffer must be marked as persistent: for (UnacknowledgedMessage msg : _unacked) { - if (msg.message.isPersistent()) + if (msg.getMessage().isPersistent()) { return true; } @@ -100,7 +100,7 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (UnacknowledgedMessage msg : _unacked) { - msg.restoreTransientMessageData(); + //msg.restoreTransientMessageData(); //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); @@ -116,7 +116,7 @@ public class TxAck implements TxnOp for (UnacknowledgedMessage msg : _unacked) { msg.clearTransientMessageData(); - msg.message.takeReference(); + msg.getMessage().takeReference(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 7088c704ed..40f5970cac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -24,19 +24,21 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoreContext; public class UnacknowledgedMessage { - public final AMQMessage message; + public final QueueEntry entry; public final AMQShortString consumerTag; public final long deliveryTag; - public AMQQueue queue; - public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, AMQShortString consumerTag, long deliveryTag) + private boolean _queueDeleted; + + + public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag) { - this.queue = queue; - this.message = message; + this.entry = entry; this.consumerTag = consumerTag; this.deliveryTag = deliveryTag; } @@ -45,9 +47,9 @@ public class UnacknowledgedMessage { StringBuilder sb = new StringBuilder(); sb.append("Q:"); - sb.append(queue); + sb.append(entry.getQueue()); sb.append(" M:"); - sb.append(message); + sb.append(entry.getMessage()); sb.append(" CT:"); sb.append(consumerTag); sb.append(" DT:"); @@ -58,22 +60,42 @@ public class UnacknowledgedMessage public void discard(StoreContext storeContext) throws AMQException { - if (queue != null) + if (entry.getQueue() != null) { - queue.dequeue(storeContext, message); + entry.getQueue().dequeue(storeContext, entry); } //if the queue is null then the message is waiting to be acked, but has been removed. - message.decrementReference(storeContext); + entry.getMessage().decrementReference(storeContext); } public void restoreTransientMessageData() throws AMQException { - message.restoreTransientMessageData(); + entry.getMessage().restoreTransientMessageData(); } public void clearTransientMessageData() { - message.clearTransientMessageData(); + entry.getMessage().clearTransientMessageData(); + } + + public AMQMessage getMessage() + { + return entry.getMessage(); + } + + public AMQQueue getQueue() + { + return entry.getQueue(); + } + + public void setQueueDeleted(boolean queueDeleted) + { + _queueDeleted = queueDeleted; + } + + public boolean isQueueDeleted() + { + return _queueDeleted; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 30bbdea2ef..20ee646a40 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -97,7 +97,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap UnacknowledgedMessage message = _map.remove(deliveryTag); if(message != null) { - _unackedSize -= message.message.getSize(); + _unackedSize -= message.getMessage().getSize(); } return message; @@ -127,7 +127,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.put(deliveryTag, message); - _unackedSize += message.message.getSize(); + _unackedSize += message.getMessage().getSize(); _lastDeliveryTag = deliveryTag; } } @@ -186,7 +186,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } it.remove(); - _unackedSize -= unacked.getValue().message.getSize(); + _unackedSize -= unacked.getValue().getMessage().getSize(); destination.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index a7bc49daab..57ae2bb6d4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -143,7 +143,7 @@ public class FanoutExchange extends AbstractExchange public AMQShortString getDefaultExchangeName()
{
- return ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
}
};
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 4f3aee0c93..c48dd902eb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -85,7 +85,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR } else { - if (message.queue == null || message.queue.isDeleted()) + if (message.isQueueDeleted() || message.getQueue().isDeleted()) { _logger.warn("Message's Queue as already been purged, unable to Reject. " + "Dropping message should use Dead Letter Queue"); @@ -93,7 +93,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR return; } - if (!message.message.isReferenced()) + if (!message.getMessage().isReferenced()) { _logger.warn("Message as already been purged, unable to Reject."); return; @@ -102,7 +102,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (_logger.isTraceEnabled()) { - _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() + + _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); @@ -111,7 +111,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR // If we haven't requested message to be resent to this consumer then reject it from ever getting it. //if (!evt.getMethod().resend) { - message.message.reject(message.message.getDeliveredSubscription(message.queue)); + message.entry.reject(); } if (body.getRequeue()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 307a4c8d21..d9a9d2273b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -77,22 +77,13 @@ public class AMQMessage /** Flag to indicate that this message requires 'immediate' delivery. */ private boolean _immediate; - // private Subscription _takenBySubcription; - // private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); - //todo: this should be part of a messageOnQueue object - private Set<Subscription> _rejectedBy = null; + private long _expiration; - //todo: this should be part of a messageOnQueue object - private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); - //todo: this should be part of a messageOnQueue object - private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); private final int hashcode = System.identityHashCode(this); - //todo: this should be part of a messageOnQueue object - private long _expiration; public String debugIdentity() { @@ -282,7 +273,7 @@ public class AMQMessage setContentHeaderBody(contentHeader); } - /** + /* * * Used in testing only. This allows the passing of the content header and some body fragments on construction. * * @param messageId @@ -293,7 +284,7 @@ public class AMQMessage * @param contentBodies * * @throws AMQException - */ + */ /* public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException @@ -306,7 +297,7 @@ public class AMQMessage addContentBodyFrame(storeContext, cb); } } - + */ protected AMQMessage(AMQMessage msg) throws AMQException { _messageId = msg._messageId; @@ -485,84 +476,6 @@ public class AMQMessage return _deliveredToConsumer; } - public boolean isTaken(AMQQueue queue) - { - // return _taken.get(); - - synchronized (this) - { - AtomicBoolean taken = _takenMap.get(queue); - if (taken == null) - { - taken = new AtomicBoolean(false); - _takenMap.put(queue, taken); - } - - return taken.get(); - } - } - - public boolean taken(AMQQueue queue, Subscription sub) - { - // if (_taken.getAndSet(true)) - // { - // return true; - // } - // else - // { - // _takenBySubcription = sub; - // return false; - // } - - synchronized (this) - { - AtomicBoolean taken = _takenMap.get(queue); - if (taken == null) - { - taken = new AtomicBoolean(false); - } - - if (taken.getAndSet(true)) - { - return true; - } - else - { - _takenMap.put(queue, taken); - _takenBySubcriptionMap.put(queue, sub); - - return false; - } - } - } - - public void release(AMQQueue queue) - { - if (_log.isTraceEnabled()) - { - _log.trace("Releasing Message:" + debugIdentity()); - } - - // _taken.set(false); - // _takenBySubcription = null; - - synchronized (this) - { - AtomicBoolean taken = _takenMap.get(queue); - if (taken == null) - { - taken = new AtomicBoolean(false); - } - else - { - taken.set(false); - } - - _deliveredToConsumer = false; - _takenMap.put(queue, taken); - _takenBySubcriptionMap.put(queue, null); - } - } public boolean checkToken(Object token) { @@ -683,7 +596,6 @@ public class AMQMessage */ public boolean expired(AMQQueue queue) throws AMQException { - // note: If the storecontext isn't need then we can remove the getChannel() from Subscription. if (_expiration != 0L) { @@ -732,7 +644,7 @@ public class AMQMessage // Increment the references to this message for each queue delivery. incrementReference(); // normal deliver so add this message at the end. - _txnContext.deliver(this, q, false); + _txnContext.deliver(q.createEntry(this), false); } } finally @@ -743,175 +655,6 @@ public class AMQMessage } } - /* - public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); - if (bodyCount == 0) - { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - - protocolSession.writeFrame(compositeBlock); - } - else - { - - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - - // - // Now start writing out the other content bodies - // - for (int i = 1; i < bodyCount; i++) - { - cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); - protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); - } - - - } - - - } - - public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException - { - ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); - if (bodyCount == 0) - { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - protocolSession.writeFrame(compositeBlock); - } - else - { - - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - - // - // Now start writing out the other content bodies - // - for (int i = 1; i < bodyCount; i++) - { - cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); - protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); - } - - - } - - - } - - - private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - MessagePublishInfo pb = getMessagePublishInfo(); - AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, - deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(), - pb.getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? - deliverFrame.writePayload(buf); - buf.flip(); - return buf; - } - - private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) - throws AMQException - { - MessagePublishInfo pb = getMessagePublishInfo(); - AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - deliveryTag, pb.getExchange(), - queueSize, - _messageHandle.isRedelivered(), - pb.getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? - getOkFrame.writePayload(buf); - buf.flip(); - return buf; - } - - private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - getMessagePublishInfo().getExchange(), - replyCode, replyText, - getMessagePublishInfo().getRoutingKey()); - ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? - returnFrame.writePayload(buf); - buf.flip(); - return buf; - } - - public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText); - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - getContentHeaderBody()); - - Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - if (bodyFrameIterator.hasNext()) - { - AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); - protocolSession.writeFrame(compositeBlock); - } - else - { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, - new AMQDataBlock[]{contentHeader}); - protocolSession.writeFrame(compositeBlock); - } - - // - // Now start writing out the other content bodies - // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded - // - while (bodyFrameIterator.hasNext()) - { - protocolSession.writeFrame(bodyFrameIterator.next()); - } - } - */ public AMQMessageHandle getMessageHandle() { @@ -954,47 +697,7 @@ public class AMQMessage // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + // _taken + " by :" + _takenBySubcription; - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " - + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); - } - - public Subscription getDeliveredSubscription(AMQQueue queue) - { - // return _takenBySubcription; - synchronized (this) - { - return _takenBySubcriptionMap.get(queue); - } - } - - public void reject(Subscription subscription) - { - if (subscription != null) - { - if (_rejectedBy == null) - { - _rejectedBy = new HashSet<Subscription>(); - } - - _rejectedBy.add(subscription); - } - else - { - _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); - } + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount; } - public boolean isRejectedBy(Subscription subscription) - { - boolean rejected = _rejectedBy != null; - - if (rejected) // We have subscriptions that rejected this message - { - return _rejectedBy.contains(subscription); - } - else // This messasge hasn't been rejected yet. - { - return rejected; - } - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0c52a358f7..53c36d9718 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class AMQQueue implements Managable, Comparable { + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. @@ -250,7 +251,7 @@ public class AMQQueue implements Managable, Comparable } /** @return List of messages(undelivered) on the queue. */ - public List<AMQMessage> getMessagesOnTheQueue() + public List<QueueEntry> getMessagesOnTheQueue() { return _deliveryMgr.getMessages(); } @@ -263,7 +264,7 @@ public class AMQQueue implements Managable, Comparable * * @return List of messages */ - public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId) + public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId) { return _deliveryMgr.getMessages(fromMessageId, toMessageId); } @@ -276,11 +277,11 @@ public class AMQQueue implements Managable, Comparable /** * @param messageId * - * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. + * @return QueueEntry with give id if exists. null if QueueEntry with given id doesn't exist. */ - public AMQMessage getMessageOnTheQueue(long messageId) + public QueueEntry getMessageOnTheQueue(long messageId) { - List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId); + List<QueueEntry> list = getMessagesOnTheQueue(messageId, messageId); if ((list == null) || (list.size() == 0)) { return null; @@ -319,15 +320,16 @@ public class AMQQueue implements Managable, Comparable toQueue.startMovingMessages(); // Get the list of messages to move. - List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); + List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); try { fromStore.beginTran(storeContext); // Move the messages in on the message store. - for (AMQMessage message : foundMessagesList) + for (QueueEntry entry : foundMessagesList) { + AMQMessage message = entry.getMessage(); fromStore.dequeueMessage(storeContext, _name, message.getMessageId()); toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId()); } @@ -397,15 +399,16 @@ public class AMQQueue implements Managable, Comparable toQueue.startMovingMessages(); // Get the list of messages to move. - List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); + List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); try { fromStore.beginTran(storeContext); // Move the messages in on the message store. - for (AMQMessage message : foundMessagesList) + for (QueueEntry entry : foundMessagesList) { + AMQMessage message = entry.getMessage(); toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId()); message.takeReference(); } @@ -463,15 +466,16 @@ public class AMQQueue implements Managable, Comparable startMovingMessages(); // Get the list of messages to move. - List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); + List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); try { fromStore.beginTran(storeContext); // remove the messages in on the message store. - for (AMQMessage message : foundMessagesList) + for (QueueEntry entry : foundMessagesList) { + AMQMessage message = entry.getMessage(); fromStore.dequeueMessage(storeContext, _name, message.getMessageId()); } @@ -513,7 +517,7 @@ public class AMQQueue implements Managable, Comparable _deliveryMgr.startMovingMessages(); } - private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList) + private void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> messageList) { _deliveryMgr.enqueueMovedMessages(storeContext, messageList); _totalMessagesReceived.addAndGet(messageList.size()); @@ -583,7 +587,7 @@ public class AMQQueue implements Managable, Comparable } - /** Removes the AMQMessage from the top of the queue. */ + /** Removes the QueueEntry from the top of the queue. */ public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException { _deliveryMgr.removeAMessageFromTop(storeContext, this); @@ -798,27 +802,28 @@ public class AMQQueue implements Managable, Comparable // return _deliveryMgr; // } - public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException + public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException { - _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); + AMQMessage msg = entry.getMessage(); + _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst); try { msg.checkDeliveredToConsumer(); - updateReceivedMessageCount(msg); + updateReceivedMessageCount(entry); } catch (NoConsumersException e) { // as this message will be returned, it should be removed // from the queue: - dequeue(storeContext, msg); + dequeue(storeContext, entry); } } - public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException + public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { try { - msg.dequeue(storeContext, this); + entry.getMessage().dequeue(storeContext, this); } catch (MessageCleanupException e) { @@ -844,8 +849,10 @@ public class AMQQueue implements Managable, Comparable return _subscribers; } - protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException + protected void updateReceivedMessageCount(QueueEntry entry) throws AMQException { + AMQMessage msg = entry.getMessage(); + if (!msg.isRedelivered()) { _totalMessagesReceived.incrementAndGet(); @@ -933,8 +940,14 @@ public class AMQQueue implements Managable, Comparable _maximumMessageAge = maximumMessageAge; } - public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg) + public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry) { - _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg); + _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, entry); } + + public QueueEntry createEntry(AMQMessage amqMessage) + { + return new QueueEntry(this, amqMessage); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 07872d7644..9e32de3f76 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -321,11 +321,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que */ public CompositeData viewMessageContent(long msgId) throws JMException { - AMQMessage msg = _queue.getMessageOnTheQueue(msgId); - if (msg == null) + QueueEntry entry = _queue.getMessageOnTheQueue(msgId); + + if (entry == null) { throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } + + AMQMessage msg = entry.getMessage(); // get message content Iterator<ContentChunk> cBodies = msg.getContentBodyIterator(); List<Byte> msgContent = new ArrayList<Byte>(); @@ -381,7 +384,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); } - List<AMQMessage> list = _queue.getMessagesOnTheQueue(); + List<QueueEntry> list = _queue.getMessagesOnTheQueue(); TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); try @@ -389,7 +392,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que // Create the tabular list of message header contents for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { - AMQMessage msg = list.get(i - 1); + AMQMessage msg = list.get(i - 1).getMessage(); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list String[] headerAttributes = getMessageHeaderProperties(headerBody); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index eabc8ebf38..3cf2cb9b12 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -55,7 +55,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager defaultValue = "false") public boolean compressBufferOnQueue; /** Holds any queued messages */ - private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); + private final MessageQueue<QueueEntry> _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>(); /** Ensures that only one asynchronous task is running for this manager at any time. */ private final AtomicBoolean _processing = new AtomicBoolean(); @@ -107,8 +107,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } - private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst) + private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst) { + AMQMessage msg = entry.getMessage(); // Shrink the ContentBodies to their actual size to save memory. if (compressBufferOnQueue) { @@ -124,12 +125,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { synchronized (_queueHeadLock) { - _messages.pushHead(msg); + _messages.pushHead(entry); } } else { - _messages.offer(msg); + _messages.offer(entry); } _totalMessageSize.addAndGet(msg.getSize()); @@ -175,11 +176,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public long getOldestMessageArrival() { - AMQMessage msg = _messages.peek(); - return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); + QueueEntry entry = _messages.peek(); + return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); } - public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg) + public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry) { _lock.lock(); try @@ -188,19 +189,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _log.debug("Queue has adding subscriber content"); _hasContent.add(subscription); - _totalMessageSize.addAndGet(msg.getSize()); + _totalMessageSize.addAndGet(entry.getSize()); _extraMessages.addAndGet(1); } else { _log.debug("Queue has removing subscriber content"); - if (msg == null) + if (entry == null) { _hasContent.remove(subscription); } else { - _totalMessageSize.addAndGet(-msg.getSize()); + _totalMessageSize.addAndGet(-entry.getSize()); _extraMessages.addAndGet(-1); } } @@ -222,14 +223,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @return List of messages */ - public List<AMQMessage> getMessages() + public List<QueueEntry> getMessages() { _lock.lock(); - List<AMQMessage> list = new ArrayList<AMQMessage>(); + List<QueueEntry> list = new ArrayList<QueueEntry>(); - for (AMQMessage message : _messages) + for (QueueEntry entry : _messages) { - list.add(message); + list.add(entry); } _lock.unlock(); @@ -244,7 +245,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @return */ - public List<AMQMessage> getMessages(long fromMessageId, long toMessageId) + public List<QueueEntry> getMessages(long fromMessageId, long toMessageId) { if (fromMessageId <= 0 || toMessageId <= 0) { @@ -255,14 +256,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); - List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); + List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>(); - for (AMQMessage message : _messages) + for (QueueEntry entry : _messages) { - long msgId = message.getMessageId(); + long msgId = entry.getMessage().getMessageId(); if (msgId >= fromMessageId && msgId <= toMessageId) { - foundMessagesList.add(message); + foundMessagesList.add(entry); } // break if the no of messages are found if (foundMessagesList.size() == maxMessageCount) @@ -282,16 +283,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")"); } - Iterator<AMQMessage> currentQueue = _messages.iterator(); + Iterator<QueueEntry> currentQueue = _messages.iterator(); while (currentQueue.hasNext()) { - AMQMessage message = currentQueue.next(); - if (!message.getDeliveredToConsumer()) + QueueEntry entry = currentQueue.next(); + + if (!entry.getDeliveredToConsumer()) { - if (subscription.hasInterest(message)) + if (subscription.hasInterest(entry)) { - subscription.enqueueForPreDelivery(message, false); + subscription.enqueueForPreDelivery(entry, false); } } } @@ -299,8 +301,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException { - AMQMessage msg = getNextMessage(); - if (msg == null) + QueueEntry entry = getNextMessage(); + if (entry == null) { return false; } @@ -322,9 +324,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + _log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId()); } - _queue.dequeue(channel.getStoreContext(), msg); + _queue.dequeue(channel.getStoreContext(), entry); } synchronized (channel) { @@ -332,22 +334,22 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (acks) { - channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue); + channel.addUnacknowledgedMessage(entry, deliveryTag, null); } - protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(), + protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(), deliveryTag, _queue.getMessageCount()); - _totalMessageSize.addAndGet(-msg.getSize()); + _totalMessageSize.addAndGet(-entry.getSize()); } if (!acks) { - msg.decrementReference(channel.getStoreContext()); + entry.getMessage().decrementReference(channel.getStoreContext()); } } finally { - msg.setDeliveredToConsumer(); + entry.setDeliveredToConsumer(); } return true; @@ -381,7 +383,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @param messageList */ - public void removeMovedMessages(List<AMQMessage> messageList) + public void removeMovedMessages(List<QueueEntry> messageList) { // Remove from the boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); @@ -391,20 +393,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!sub.isSuspended() && sub.filtersMessages()) { - Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue(); - for (AMQMessage msg : messageList) + Queue<QueueEntry> preDeliveryQueue = sub.getPreDeliveryQueue(); + for (QueueEntry entry : messageList) { - preDeliveryQueue.remove(msg); + preDeliveryQueue.remove(entry); } } } } - for (AMQMessage msg : messageList) + for (QueueEntry entry : messageList) { - if (_messages.remove(msg)) + if (_messages.remove(entry)) { - _totalMessageSize.getAndAdd(-msg.getSize()); + _totalMessageSize.getAndAdd(-entry.getSize()); } } } @@ -420,16 +422,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _lock.lock(); - AMQMessage message = _messages.poll(); + QueueEntry entry = _messages.poll(); - if (message != null) + if (entry != null) { - queue.dequeue(storeContext, message); + queue.dequeue(storeContext, entry); - _totalMessageSize.addAndGet(-message.getSize()); + _totalMessageSize.addAndGet(-entry.getSize()); //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. - message.decrementReference(storeContext); + entry.getMessage().decrementReference(storeContext); } @@ -443,17 +445,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager synchronized (_queueHeadLock) { - AMQMessage msg = getNextMessage(); - while (msg != null) + QueueEntry entry = getNextMessage(); + while (entry != null) { //and remove it _messages.poll(); - _queue.dequeue(storeContext, msg); + _queue.dequeue(storeContext, entry); - msg.decrementReference(_reapingStoreContext); + entry.getMessage().decrementReference(_reapingStoreContext); - msg = getNextMessage(); + entry = getNextMessage(); count++; } _totalMessageSize.set(0L); @@ -469,34 +471,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @throws org.apache.qpid.AMQException */ - private AMQMessage getNextMessage() throws AMQException + private QueueEntry getNextMessage() throws AMQException { return getNextMessage(_messages, null, false); } - private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub, boolean purgeOnly) throws AMQException + private QueueEntry getNextMessage(Queue<QueueEntry> messages, Subscription sub, boolean purgeOnly) throws AMQException { - AMQMessage message = messages.peek(); + QueueEntry entry = messages.peek(); //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) - while (purgeMessage(message, sub, purgeOnly)) + while (purgeMessage(entry, sub, purgeOnly)) { + AMQMessage message = entry.getMessage(); // if we are purging then ensure we mark this message taken for the current subscriber // the current subscriber may be null in the case of a get or a purge but this is ok. // boolean alreadyTaken = message.taken(_queue, sub); //remove the already taken message or expired - AMQMessage removed = messages.poll(); + QueueEntry removed = messages.poll(); - assert removed == message; + assert removed == entry; // if the message expired then the _totalMessageSize needs adjusting - if (message.expired(_queue) && !message.getDeliveredToConsumer()) + if (message.expired(_queue) && !entry.getDeliveredToConsumer()) { - _totalMessageSize.addAndGet(-message.getSize()); + _totalMessageSize.addAndGet(-entry.getSize()); // Use the reapingStoreContext as any sub(if we have one) may be in a tx. - _queue.dequeue(_reapingStoreContext, message); + _queue.dequeue(_reapingStoreContext, entry); message.decrementReference(_reapingStoreContext); @@ -515,10 +518,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } // try the next message - message = messages.peek(); + entry = messages.peek(); } - return message; + return entry; } /** @@ -534,7 +537,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @throws AMQException */ - private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException + private boolean purgeMessage(QueueEntry message, Subscription sub) throws AMQException { return purgeMessage(message, sub, false); } @@ -552,7 +555,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @throws AMQException */ - private boolean purgeMessage(AMQMessage message, Subscription sub, boolean purgeOnly) throws AMQException + private boolean purgeMessage(QueueEntry message, Subscription sub, boolean purgeOnly) throws AMQException { //Original.. complicated while loop control // (message != null @@ -567,7 +570,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (message != null) { // Check that the message hasn't expired. - if (message.expired(_queue)) + if (message.expired()) { return true; } @@ -576,13 +579,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (sub != null) { // if we have a queue browser(we don't purge) so check mark the message as taken - purge = ((!sub.isBrowser() || message.isTaken(_queue))); + purge = ((!sub.isBrowser() || message.isTaken())); } else { // if there is no subscription we are doing // a get or purging so mark message as taken. - message.isTaken(_queue); + message.isTaken(); // and then ensure that it gets purged purge = true; } @@ -592,20 +595,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { // If we are simply purging the queue don't take the message // just purge up to the next non-taken msg. - return purge && message.isTaken(_queue); + return purge && message.isTaken(); } else { // if we are purging then ensure we mark this message taken for the current subscriber // the current subscriber may be null in the case of a get or a purge but this is ok. - return purge && message.taken(_queue, sub); + return purge && message.taken(sub); } } - public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue) + public void sendNextMessage(Subscription sub, AMQQueue queue) { - Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages); + Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages); if (_log.isTraceEnabled()) { @@ -624,16 +627,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return; } - AMQMessage message = null; - AMQMessage removed = null; + QueueEntry entry = null; + QueueEntry removed = null; try { synchronized (_queueHeadLock) { - message = getNextMessage(messageQueue, sub, false); + entry = getNextMessage(messageQueue, sub, false); // message will be null if we have no messages in the messageQueue. - if (message == null) + if (entry == null) { if (_log.isTraceEnabled()) { @@ -643,7 +646,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) + + _log.debug(debugIdentity() + "Async Delivery Message :" + entry + "(" + System.identityHashCode(entry) + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(sub)); } @@ -651,10 +654,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (messageQueue == _messages) { - _totalMessageSize.addAndGet(-message.getSize()); + _totalMessageSize.addAndGet(-entry.getSize()); } - sub.send(message, _queue); + sub.send(entry, _queue); //remove sent message from our queue. removed = messageQueue.poll(); @@ -662,14 +665,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Otherwise the Async send will never end } - if (removed != message) + if (removed != entry) { - _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed); + _log.error("Just send message:" + entry.getMessage().debugIdentity() + " BUT removed this from queue:" + removed); } if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message + + _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.getMessage().debugIdentity() + "d:" + entry + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(sub)); } @@ -704,9 +707,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } catch (AMQException e) { - if (message != null) + if (entry != null) { - message.release(_queue); + entry.release(); } else { @@ -734,23 +737,23 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * @param storeContext * @param movedMessageList */ - public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList) + public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList) { _lock.lock(); - for (AMQMessage msg : movedMessageList) + for (QueueEntry entry : movedMessageList) { - addMessageToQueue(msg, false); + addMessageToQueue(entry, false); } // enqueue on the pre delivery queues for (Subscription sub : _subscriptions.getSubscriptions()) { - for (AMQMessage msg : movedMessageList) + for (QueueEntry entry : movedMessageList) { // Only give the message to those that want them. - if (sub.hasInterest(msg)) + if (sub.hasInterest(entry)) { - sub.enqueueForPreDelivery(msg, true); + sub.enqueueForPreDelivery(entry, true); } } } @@ -802,30 +805,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } - public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException + public void deliver(StoreContext context, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws AMQException { final boolean debugEnabled = _log.isDebugEnabled(); if (debugEnabled) { - _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg); + _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + entry); } //Check if we have someone to deliver the message to. _lock.lock(); try { - Subscription s = _subscriptions.nextSubscriber(msg); + Subscription s = _subscriptions.nextSubscriber(entry); if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing { if (debugEnabled) { - _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); + _log.debug(debugIdentity() + "Testing Message(" + entry + ") for Queued Delivery:" + currentStatus()); } - if (!msg.getMessagePublishInfo().isImmediate()) + if (!entry.getMessage().getMessagePublishInfo().isImmediate()) { - addMessageToQueue(msg, deliverFirst); + addMessageToQueue(entry, deliverFirst); //release lock now message is on queue. _lock.unlock(); @@ -840,25 +843,25 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { // stop if the message gets delivered whilst PreDelivering if we have a shared queue. - if (_queue.isShared() && msg.getDeliveredToConsumer()) + if (_queue.isShared() && entry.getDeliveredToConsumer()) { if (debugEnabled) { - _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + + _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) + ") is already delivered."); } continue; } // Only give the message to those that want them. - if (sub.hasInterest(msg)) + if (sub.hasInterest(entry)) { if (debugEnabled) { - _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } - sub.enqueueForPreDelivery(msg, deliverFirst); + sub.enqueueForPreDelivery(entry, deliverFirst); } } @@ -893,11 +896,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isTraceEnabled()) { - _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + + _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } - if (msg.taken(_queue, s)) + if (entry.taken(s)) { //Message has been delivered so don't redeliver. // This can currently occur because of the recursive call below @@ -927,14 +930,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return; } //Deliver the message - s.send(msg, _queue); + s.send(entry, _queue); } else { if (debugEnabled) { _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " + - "suspended between nextSubscriber and send for message:" + msg.debugIdentity()); + "suspended between nextSubscriber and send for message:" + entry.getMessage().debugIdentity()); } } } @@ -943,21 +946,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Why do we do this? What was the reasoning? We should have a better approach // than recursion and rejecting if someone else sends it before we do. // - if (!msg.isTaken(_queue)) + if (!entry.isTaken()) { if (debugEnabled) { - _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" + + _log.debug(debugIdentity() + " Message(" + entry.getMessage().debugIdentity() + ") has not been taken so recursing!:" + " Subscriber:" + System.identityHashCode(s)); } - deliver(context, name, msg, deliverFirst); + deliver(context, name, entry, deliverFirst); } else { if (debugEnabled) { - _log.debug(debugIdentity() + " Message(" + msg.toString() + + _log.debug(debugIdentity() + " Message(" + entry.toString() + ") has been taken so disregarding deliver request to Subscriber:" + System.identityHashCode(s)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 153106d919..f7f35a9319 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -64,13 +64,13 @@ interface DeliveryManager * * @param storeContext * @param name the name of the entity on whose behalf we are delivering the message - * @param msg the message to deliver + * @param entry the message to deliver * @param deliverFirst * * @throws org.apache.qpid.server.queue.FailedDequeueException * if the message could not be dequeued */ - void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException; + void deliver(StoreContext storeContext, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws FailedDequeueException, AMQException; void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException; @@ -78,15 +78,15 @@ interface DeliveryManager void startMovingMessages(); - void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList); + void enqueueMovedMessages(StoreContext context, List<QueueEntry> messageList); void stopMovingMessages(); - void removeMovedMessages(List<AMQMessage> messageListToRemove); + void removeMovedMessages(List<QueueEntry> messageListToRemove); - List<AMQMessage> getMessages(); + List<QueueEntry> getMessages(); - List<AMQMessage> getMessages(long fromMessageId, long toMessageId); + List<QueueEntry> getMessages(long fromMessageId, long toMessageId); void populatePreDeliveryQueue(Subscription subscription); @@ -96,5 +96,5 @@ interface DeliveryManager long getOldestMessageArrival(); - void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg); + void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java new file mode 100644 index 0000000000..69595f1d6c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; +import org.apache.log4j.Logger; + +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicReference; + + +public class QueueEntry +{ + + /** + * Used for debugging purposes. + */ + private static final Logger _log = Logger.getLogger(QueueEntry.class); + + private final AMQQueue _queue; + private final AMQMessage _message; + + private Set<Subscription> _rejectedBy = null; + + private AtomicReference<Object> _owner = new AtomicReference<Object>(); + + + public QueueEntry(AMQQueue queue, AMQMessage message) + { + _queue = queue; + _message = message; + } + + + public AMQQueue getQueue() + { + return _queue; + } + + public AMQMessage getMessage() + { + return _message; + } + + public long getSize() + { + return getMessage().getSize(); + } + + public boolean getDeliveredToConsumer() + { + return getMessage().getDeliveredToConsumer(); + } + + public boolean expired() throws AMQException + { + return getMessage().expired(_queue); + } + + public boolean isTaken() + { + return _owner.get() != null; + } + + public boolean taken(Subscription sub) + { + return !(_owner.compareAndSet(null, sub == null ? this : sub)); + } + + public void setDeliveredToConsumer() + { + getMessage().setDeliveredToConsumer(); + } + + public void release() + { + _owner.set(null); + } + + public String debugIdentity() + { + return getMessage().debugIdentity(); + } + + public void process(StoreContext storeContext, boolean deliverFirst) throws AMQException + { + _queue.process(storeContext, this, deliverFirst); + } + + public void checkDeliveredToConsumer() throws NoConsumersException + { + _message.checkDeliveredToConsumer(); + } + + public void setRedelivered(boolean b) + { + getMessage().setRedelivered(true); + } + + public Subscription getDeliveredSubscription() + { + synchronized (this) + { + Object owner = _owner.get(); + if (owner instanceof Subscription) + { + return (Subscription) owner; + } + else + { + return null; + } + } + } + + public void reject() + { + reject(getDeliveredSubscription()); + } + + public void reject(Subscription subscription) + { + if (subscription != null) + { + if (_rejectedBy == null) + { + _rejectedBy = new HashSet<Subscription>(); + } + + _rejectedBy.add(subscription); + } + else + { + _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); + } + } + + public boolean isRejectedBy(Subscription subscription) + { + boolean rejected = _rejectedBy != null; + + if (rejected) // We have subscriptions that rejected this message + { + return _rejectedBy.contains(subscription); + } + else // This messasge hasn't been rejected yet. + { + return rejected; + } + } + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index 77688f19be..a706098b71 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.AMQChannel; public interface Subscription { - void send(AMQMessage msg, AMQQueue queue) throws AMQException; + void send(QueueEntry msg, AMQQueue queue) throws AMQException; boolean isSuspended(); @@ -35,15 +35,15 @@ public interface Subscription boolean filtersMessages(); - boolean hasInterest(AMQMessage msg); + boolean hasInterest(QueueEntry msg); - Queue<AMQMessage> getPreDeliveryQueue(); + Queue<QueueEntry> getPreDeliveryQueue(); - Queue<AMQMessage> getResendQueue(); + Queue<QueueEntry> getResendQueue(); - Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages); + Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages); - void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst); + void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst); boolean isAutoClose(); @@ -53,9 +53,9 @@ public interface Subscription boolean isBrowser(); - boolean wouldSuspend(AMQMessage msg); + boolean wouldSuspend(QueueEntry msg); - void addToResendQueue(AMQMessage msg); + void addToResendQueue(QueueEntry msg); Object getSendLock(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 1299c3a80c..e631481cc8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -58,9 +58,9 @@ public class SubscriptionImpl implements Subscription private final Object _sessionKey; - private MessageQueue<AMQMessage> _messages; + private MessageQueue<QueueEntry> _messages; - private Queue<AMQMessage> _resendQueue; + private Queue<QueueEntry> _resendQueue; private final boolean _noLocal; @@ -160,7 +160,7 @@ public class SubscriptionImpl implements Subscription if (filtersMessages()) { - _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); + _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>(); } else { @@ -226,7 +226,7 @@ public class SubscriptionImpl implements Subscription * * @throws AMQException */ - public void send(AMQMessage msg, AMQQueue queue) throws AMQException + public void send(QueueEntry msg, AMQQueue queue) throws AMQException { if (msg != null) { @@ -245,7 +245,7 @@ public class SubscriptionImpl implements Subscription } } - private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException + private void sendToBrowser(QueueEntry msg, AMQQueue queue) throws AMQException { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -266,11 +266,11 @@ public class SubscriptionImpl implements Subscription _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); } - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(msg.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); } } - private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) + private void sendToConsumer(StoreContext storeContext, QueueEntry entry, AMQQueue queue) throws AMQException { try @@ -287,9 +287,9 @@ public class SubscriptionImpl implements Subscription { if (_logger.isDebugEnabled()) { - _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId()); } - queue.dequeue(storeContext, msg); + queue.dequeue(storeContext, entry); } synchronized (channel) @@ -298,19 +298,19 @@ public class SubscriptionImpl implements Subscription if (_sendLock.get()) { - _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); } if (_acks) { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag); } - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); if (!_acks) { - msg.decrementReference(storeContext); + entry.getMessage().decrementReference(storeContext); } } } @@ -320,7 +320,7 @@ public class SubscriptionImpl implements Subscription // using a try->finally would set it even if an error occured. // Is this what we want? - msg.setDeliveredToConsumer(); + entry.setDeliveredToConsumer(); } } @@ -355,19 +355,19 @@ public class SubscriptionImpl implements Subscription return _filters != null || _noLocal; } - public boolean hasInterest(AMQMessage msg) + public boolean hasInterest(QueueEntry entry) { //check that the message hasn't been rejected - if (msg.isRejectedBy(this)) + if (entry.isRejectedBy(this)) { if (_logger.isDebugEnabled()) { - _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity()); + _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity()); } // return false; } - final AMQProtocolSession publisher = msg.getPublisher(); + final AMQProtocolSession publisher = entry.getMessage().getPublisher(); //todo - client id should be recoreded and this test removed but handled below if (_noLocal && publisher != null) @@ -418,9 +418,9 @@ public class SubscriptionImpl implements Subscription if (_logger.isTraceEnabled()) { - _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity()); + _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); } - return checkFilters(msg); + return checkFilters(entry); } @@ -431,7 +431,7 @@ public class SubscriptionImpl implements Subscription return id; } - private boolean checkFilters(AMQMessage msg) + private boolean checkFilters(QueueEntry msg) { if (_filters != null) { @@ -439,7 +439,7 @@ public class SubscriptionImpl implements Subscription // { // _logger.trace("(" + debugIdentity() + ") has filters."); // } - return _filters.allAllow(msg); + return _filters.allAllow(msg.getMessage()); } else { @@ -452,12 +452,12 @@ public class SubscriptionImpl implements Subscription } } - public Queue<AMQMessage> getPreDeliveryQueue() + public Queue<QueueEntry> getPreDeliveryQueue() { return _messages; } - public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) + public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst) { if (_messages != null) { @@ -561,19 +561,19 @@ public class SubscriptionImpl implements Subscription while (!_resendQueue.isEmpty()) { - AMQMessage resent = _resendQueue.poll(); + QueueEntry resent = _resendQueue.poll(); if (_logger.isTraceEnabled()) { _logger.trace("Removed for resending:" + resent.debugIdentity()); } - resent.release(_queue); + resent.release(); _queue.subscriberHasPendingResend(false, this, resent); try { - channel.getTransactionalContext().deliver(resent, _queue, true); + channel.getTransactionalContext().deliver(resent, true); } catch (AMQException e) { @@ -611,22 +611,22 @@ public class SubscriptionImpl implements Subscription return _isBrowser; } - public boolean wouldSuspend(AMQMessage msg) + public boolean wouldSuspend(QueueEntry msg) { - return channel.wouldSuspend(msg); + return channel.wouldSuspend(msg.getMessage()); } - public Queue<AMQMessage> getResendQueue() + public Queue<QueueEntry> getResendQueue() { if (_resendQueue == null) { - _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + _resendQueue = new ConcurrentLinkedQueueAtomicSize<QueueEntry>(); } return _resendQueue; } - public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages) { if (_resendQueue != null && !_resendQueue.isEmpty()) { @@ -651,7 +651,7 @@ public class SubscriptionImpl implements Subscription } } - public void addToResendQueue(AMQMessage msg) + public void addToResendQueue(QueueEntry msg) { // add to our resend queue getResendQueue().add(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java index 4df88baebc..bc17bcca9c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java @@ -30,5 +30,5 @@ public interface SubscriptionManager { public List<Subscription> getSubscriptions(); public boolean hasActiveSubscribers(); - public Subscription nextSubscriber(AMQMessage msg); + public Subscription nextSubscriber(QueueEntry entry); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b500247fa4..b73b8d7e07 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -113,7 +113,7 @@ class SubscriptionSet implements WeightedSubscriptionManager * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning. */ - public Subscription nextSubscriber(AMQMessage msg) + public Subscription nextSubscriber(QueueEntry msg) { if (_subscriptions.isEmpty()) { @@ -140,7 +140,7 @@ class SubscriptionSet implements WeightedSubscriptionManager } } - private Subscription nextSubscriberImpl(AMQMessage msg) + private Subscription nextSubscriberImpl(QueueEntry msg) { final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); while (iterator.hasNext()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 9068f871cb..b12afd9a41 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; @@ -64,14 +65,13 @@ public class LocalTransactionalContext implements TransactionalContext private static class DeliveryDetails { - public AMQMessage message; - public AMQQueue queue; + public QueueEntry entry; + private boolean deliverFirst; - public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst) + public DeliveryDetails(QueueEntry entry, boolean deliverFirst) { - this.message = message; - this.queue = queue; + this.entry = entry; this.deliverFirst = deliverFirst; } } @@ -103,7 +103,7 @@ public class LocalTransactionalContext implements TransactionalContext _postCommitDeliveryList.clear(); } - public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException + public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException { // A publication will result in the enlisting of several // TxnOps. The first is an op that will store the message. @@ -112,9 +112,9 @@ public class LocalTransactionalContext implements TransactionalContext // enqueued. Finally a cleanup op will be added to decrement // the reference associated with the routing. // message.incrementReference(); - _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); + _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst)); _messageDelivered = true; - _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); + _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages)); /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue)); if (_log.isDebugEnabled()) { @@ -242,11 +242,11 @@ public class LocalTransactionalContext implements TransactionalContext { for (DeliveryDetails dd : _postCommitDeliveryList) { - dd.queue.process(_storeContext, dd.message, dd.deliverFirst); + dd.entry.process(_storeContext, dd.deliverFirst); try { - dd.message.checkDeliveredToConsumer(); + dd.entry.checkDeliveredToConsumer(); } catch (NoConsumersException nce) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index b3d69543d4..047cef9064 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; @@ -92,14 +93,14 @@ public class NonTransactionalContext implements TransactionalContext // Does not apply to this context } - public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException + public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException { try { - queue.process(_storeContext, message, deliverFirst); + entry.process(_storeContext, deliverFirst); //following check implements the functionality //required by the 'immediate' flag: - message.checkDeliveredToConsumer(); + entry.checkDeliveredToConsumer(); } catch (NoConsumersException e) { @@ -128,7 +129,7 @@ public class NonTransactionalContext implements TransactionalContext { if (_log.isDebugEnabled()) { - _log.debug("Discarding message: " + message.message.getMessageId()); + _log.debug("Discarding message: " + message.getMessage().getMessageId()); } //Message has been ack so discard it. This will dequeue and decrement the reference. @@ -162,7 +163,7 @@ public class NonTransactionalContext implements TransactionalContext { if (_log.isDebugEnabled()) { - _log.debug("Discarding message: " + msg.message.getMessageId()); + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } //Message has been ack so discard it. This will dequeue and decrement the reference. @@ -192,7 +193,7 @@ public class NonTransactionalContext implements TransactionalContext { if (_log.isDebugEnabled()) { - _log.debug("Discarding message: " + msg.message.getMessageId()); + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } //Message has been ack so discard it. This will dequeue and decrement the reference. @@ -206,7 +207,7 @@ public class NonTransactionalContext implements TransactionalContext if (_log.isDebugEnabled()) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + - msg.message.getMessageId()); + msg.getMessage().getMessageId()); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java index fee25c07df..b4c66aa24d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoreContext; /** @@ -111,14 +112,13 @@ public interface TransactionalContext * * <p/>This is an 'enqueue' operation. * - * @param message The message to deliver. - * @param queue The queue to deliver the message to. + * @param entry The message to deliver, and the queue to deliver to. * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt> * for normal FIFO message ordering. * * @throws AMQException If the message cannot be delivered for any reason. */ - void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException; + void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException; /** * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java index eea53252c6..218d5f04ed 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java @@ -24,6 +24,7 @@ import org.apache.commons.codec.binary.Hex; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; @@ -85,7 +86,7 @@ public class Dump extends Show } - protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting, + protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting, boolean showMessageHeaders) { @@ -96,8 +97,9 @@ public class Dump extends Show display.add(hex); display.add(ascii); - for (AMQMessage msg : messages) + for (QueueEntry entry : messages) { + AMQMessage msg = entry.getMessage(); if (!includeMsg(msg, msgids)) { continue; @@ -252,8 +254,8 @@ public class Dump extends Show private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg, String title, boolean routing, boolean headers, boolean messageHeaders) { - List<AMQMessage> single = new LinkedList<AMQMessage>(); - single.add(msg); + List<QueueEntry> single = new LinkedList<QueueEntry>(); + single.add(new QueueEntry(null,msg)); List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders); diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java index 25cff27445..7e21253fab 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java @@ -23,6 +23,7 @@ package org.apache.qpid.tools.messagestore.commands; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.tools.messagestore.MessageStoreTool; @@ -166,12 +167,12 @@ public class Move extends AbstractCommand if (fromQueue != null) { - List<AMQMessage> messages = fromQueue.getMessagesOnTheQueue(); + List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue(); if (messages != null) { - for (AMQMessage msg : messages) + for (QueueEntry msg : messages) { - ids.add(msg.getMessageId()); + ids.add(msg.getMessage().getMessageId()); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 5988cdabfc..a6dccf0f36 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; @@ -114,7 +115,7 @@ public class Show extends AbstractCommand if (_queue != null) { - List<AMQMessage> messages = _queue.getMessagesOnTheQueue(); + List<QueueEntry> messages = _queue.getMessagesOnTheQueue(); if (messages == null || messages.size() == 0) { _console.println("No messages on queue"); @@ -153,7 +154,7 @@ public class Show extends AbstractCommand * @param showMessageHeaders show the msg headers be shown * @return the formated data lists for printing */ - protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting, + protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting, boolean showMessageHeaders) { @@ -334,8 +335,9 @@ public class Show extends AbstractCommand } //Add create the table of data - for (AMQMessage msg : messages) + for (QueueEntry entry : messages) { + AMQMessage msg = entry.getMessage(); if (!includeMsg(msg, msgids)) { continue; diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index 0a3bc93763..8e5879a51e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -104,7 +104,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -146,7 +146,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -166,7 +166,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -207,7 +207,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -227,7 +227,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -247,7 +247,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -266,7 +266,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -308,7 +308,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -327,7 +327,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -369,7 +369,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -403,7 +403,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -446,7 +446,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -487,7 +487,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index b718b9c861..81b0ae2213 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -286,7 +286,7 @@ public class AMQQueueAlertTest extends TestCase for (int i = 0; i < messageCount; i++) { - _queue.process(_storeContext, messages[i], false); + _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 4770779363..d86c90bdae 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -211,7 +211,7 @@ public class AMQQueueMBeanTest extends TestCase msg.enqueue(_queue); msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - _queue.process(_storeContext, msg, false); + _queue.process(_storeContext, new QueueEntry(_queue, msg), false); _queueMBean.viewMessageContent(id); try { diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 3ee8277eba..10189a8017 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -133,7 +134,7 @@ public class TxAckTest extends TestCase }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); - _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); + _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag)); } _acked = acked; _unacked = unacked; @@ -150,7 +151,7 @@ public class TxAckTest extends TestCase { UnacknowledgedMessage u = _map.get(tag); assertTrue("Message not found for tag " + tag, u != null); - ((TestMessage) u.message).assertCountEquals(expected); + ((TestMessage) u.getMessage()).assertCountEquals(expected); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index ff5517bdd5..58323086b5 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; @@ -250,9 +251,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase * @param deliverFirst * @throws AMQException */ - public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException + public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException { - messages.add(new HeadersExchangeTest.Message(msg)); + messages.add(new HeadersExchangeTest.Message(msg.getMessage())); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index be788a02da..790607e268 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -142,7 +142,7 @@ public class AckTest extends TestCase msg.incrementReference(); msg.routingComplete(_messageStore, _storeContext, factory); // we manually send the message to the subscription - _subscription.send(msg, _queue); + _subscription.send(new QueueEntry(_queue,msg), _queue); } } @@ -167,7 +167,7 @@ public class AckTest extends TestCase assertTrue(deliveryTag == i); i++; UnacknowledgedMessage unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.queue == _queue); + assertTrue(unackedMsg.getQueue() == _queue); } assertTrue(map.size() == msgCount); @@ -228,7 +228,7 @@ public class AckTest extends TestCase { assertTrue(deliveryTag == i); UnacknowledgedMessage unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.queue == _queue); + assertTrue(unackedMsg.getQueue() == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) { @@ -257,7 +257,7 @@ public class AckTest extends TestCase { assertTrue(deliveryTag == i + 5); UnacknowledgedMessage unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.queue == _queue); + assertTrue(unackedMsg.getQueue() == _queue); ++i; } } @@ -281,7 +281,7 @@ public class AckTest extends TestCase { assertTrue(deliveryTag == i + 5); UnacknowledgedMessage unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.queue == _queue); + assertTrue(unackedMsg.getQueue() == _queue); ++i; } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java index 068f37574d..282ad3ed5e 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java @@ -42,9 +42,9 @@ public class ConcurrencyTestDisabled extends MessageTestHelper private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>(); private final Set<Subscription> _active = new HashSet<Subscription>(); - private final List<AMQMessage> _messages = new ArrayList<AMQMessage>(); + private final List<QueueEntry> _messages = new ArrayList<QueueEntry>(); private int next = 0;//index to next message to send - private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>()); + private final List<QueueEntry> _received = Collections.synchronizedList(new ArrayList<QueueEntry>()); private final Executor _executor = new OnCurrentThreadExecutor(); private final List<Thread> _threads = new ArrayList<Thread>(); @@ -159,7 +159,7 @@ public class ConcurrencyTestDisabled extends MessageTestHelper } } - private AMQMessage nextMessage() + private QueueEntry nextMessage() { synchronized (_messages) { @@ -191,7 +191,7 @@ public class ConcurrencyTestDisabled extends MessageTestHelper { void doRun() throws Throwable { - AMQMessage msg = nextMessage(); + QueueEntry msg = nextMessage(); if (msg != null) { _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index dc5a6d3cf6..b33259cfba 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -40,7 +40,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper public void testStartInQueueingMode() throws AMQException { - AMQMessage[] messages = new AMQMessage[10]; + QueueEntry[] messages = new QueueEntry[10]; for (int i = 0; i < messages.length; i++) { messages[i] = message(); @@ -85,7 +85,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper public void testStartInDirectMode() throws AMQException { - AMQMessage[] messages = new AMQMessage[10]; + QueueEntry[] messages = new QueueEntry[10]; for (int i = 0; i < messages.length; i++) { messages[i] = message(); @@ -132,7 +132,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper { try { - AMQMessage msg = message(true); + QueueEntry msg = message(true); _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); @@ -154,7 +154,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper SubscriptionTestHelper s = new SubscriptionTestHelper("A"); _subscriptions.addSubscriber(s); s.setSuspended(true); - AMQMessage msg = message(true); + QueueEntry msg = message(true); _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java index 88272023e8..812aec6a5d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -55,12 +55,12 @@ class MessageTestHelper extends TestCase ApplicationRegistry.initialise(new NullApplicationRegistry()); } - AMQMessage message() throws AMQException + QueueEntry message() throws AMQException { return message(false); } - AMQMessage message(final boolean immediate) throws AMQException + QueueEntry message(final boolean immediate) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -86,8 +86,8 @@ class MessageTestHelper extends TestCase } }; - return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, - new ContentHeaderBody()); + return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, + new ContentHeaderBody())); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index fe947ef3bc..5846ad0a9d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -28,13 +28,13 @@ import java.util.Queue; public class SubscriptionTestHelper implements Subscription { - private final List<AMQMessage> messages; + private final List<QueueEntry> messages; private final Object key; private boolean isSuspended; public SubscriptionTestHelper(Object key) { - this(key, new ArrayList<AMQMessage>()); + this(key, new ArrayList<QueueEntry>()); } public SubscriptionTestHelper(final Object key, final boolean isSuspended) @@ -43,18 +43,18 @@ public class SubscriptionTestHelper implements Subscription setSuspended(isSuspended); } - SubscriptionTestHelper(Object key, List<AMQMessage> messages) + SubscriptionTestHelper(Object key, List<QueueEntry> messages) { this.key = key; this.messages = messages; } - List<AMQMessage> getMessages() + List<QueueEntry> getMessages() { return messages; } - public void send(AMQMessage msg, AMQQueue queue) + public void send(QueueEntry msg, AMQQueue queue) { messages.add(msg); } @@ -69,12 +69,12 @@ public class SubscriptionTestHelper implements Subscription return isSuspended; } - public boolean wouldSuspend(AMQMessage msg) + public boolean wouldSuspend(QueueEntry msg) { return isSuspended; } - public void addToResendQueue(AMQMessage msg) + public void addToResendQueue(QueueEntry msg) { //no-op } @@ -98,27 +98,27 @@ public class SubscriptionTestHelper implements Subscription return false; } - public boolean hasInterest(AMQMessage msg) + public boolean hasInterest(QueueEntry msg) { return true; } - public Queue<AMQMessage> getPreDeliveryQueue() + public Queue<QueueEntry> getPreDeliveryQueue() { return null; } - public Queue<AMQMessage> getResendQueue() + public Queue<QueueEntry> getResendQueue() { return null; } - public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages) { return messages; } - public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) + public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst) { //no-op } |