diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
commit | de248153d311b1e0211dfe3230afcb306f3c0192 (patch) | |
tree | 30412df8d5fd1d3ef076fba0903301b25f8a7518 | |
parent | f74e4dc27d1655760d0213fd60cc75c272c26f00 (diff) | |
download | qpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz |
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
BROKER
AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver.
BasicRejectMethodHandler - initial place holder.
TxRollbackHandler - Added comment
AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue.
AMQQueue - added the queue reference to the Subscription creation
ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355
DeliveryManager - adjusted deliver call to allow delivery to the head of the queue.
Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed.
SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription.
SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure.
SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue.
AMQStateManager - Added BasicRejectMethodHandler
TransactionalContext - Added option to deliver the messages to the front of the queue.
LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue.
NonTransactionalContext - Added option to deliver the messages to the front of the queue.
DeliverMessageOperation.java DELELTED AS NOT USED.
CLIENT
AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover.
BasicMessageConsumer - updated the rollback so that it sends reject messages to server.
AbstractJMSMessage - whitespace + added extra message properties to the toString()
AMQProtocolHandler - whitespace + extra debug output
TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on.
CLUSTER
ClusteredQueue - AMQQueue changes for message deliveryFirst.
RemoteSubscriptionImpl - Implementation of Subscription
SYSTESTS
AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst.
AMQQueueMBeanTest - changes for message deliveryFirst.
ConcurrencyTest - changes for message deliveryFirst.
DeliveryManagerTest - changes for message deliveryFirst.
SubscriptionTestHelper - Implementation of Subscription
WhiteSpace only
UnacknowledgedMessageMapImpl.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
32 files changed, 1707 insertions, 609 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 7271bd6e43..7ceb3a7eef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.Subscription; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; @@ -74,28 +75,20 @@ public class AMQChannel */ private AtomicLong _deliveryTag = new AtomicLong(0); - /** - * A channel has a default queue (the last declared) that is used when no queue name is - * explictily set - */ + /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ private AMQQueue _defaultQueue; - /** - * This tag is unique per subscription to a queue. The server returns this in response to a - * basic.consume request. - */ + /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ private int _consumerTag; /** - * The current message - which may be partial in the sense that not all frames have been received yet - - * which has been received by this channel. As the frames are received the message gets updated and once all - * frames have been received the message can then be routed. + * The current message - which may be partial in the sense that not all frames have been received yet - which has + * been received by this channel. As the frames are received the message gets updated and once all frames have been + * received the message can then be routed. */ private AMQMessage _currentMessage; - /** - * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. - */ + /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */ private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>(); private final MessageStore _messageStore; @@ -109,8 +102,8 @@ public class AMQChannel private TransactionalContext _txnContext; /** - * A context used by the message store enabling it to track context for a given channel even across - * thread boundaries + * A context used by the message store enabling it to track context for a given channel even across thread + * boundaries */ private final StoreContext _storeContext; @@ -123,7 +116,6 @@ public class AMQChannel private final AMQProtocolSession _session; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { @@ -138,9 +130,7 @@ public class AMQChannel _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } - /** - * Sets this channel to be part of a local transaction - */ + /** Sets this channel to be part of a local transaction */ public void setLocalTransactional() { _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages); @@ -293,17 +283,17 @@ public class AMQChannel } /** - * Subscribe to a queue. We register all subscriptions in the channel so that - * if the channel is closed we can clean up all subscriptions, even if the - * client does not explicitly unsubscribe from all queues. + * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean + * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. * - * @param tag the tag chosen by the client (if null, server will generate one) - * @param queue the queue to subscribe to - * @param session the protocol session of the subscriber + * @param tag the tag chosen by the client (if null, server will generate one) + * @param queue the queue to subscribe to + * @param session the protocol session of the subscriber * @param noLocal * @param exclusive - * @return the consumer tag. This is returned to the subscriber and used in - * subsequent unsubscribe requests + * + * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests + * * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ @@ -335,7 +325,7 @@ public class AMQChannel } /** - * Called from the protocol session to close this channel and clean up. + * Called from the protocol session to close this channel and clean up. T * * @throws AMQException if there is an error during closure */ @@ -344,8 +334,6 @@ public class AMQChannel _txnContext.rollback(); unsubscribeAllConsumers(session); requeue(); - _txnContext.commit(); - } private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException @@ -362,8 +350,8 @@ public class AMQChannel * Add a message to the channel-based list of unacknowledged messages * * @param message the message that was delivered - * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of - * the delivery tag) + * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the + * delivery tag) * @param queue the queue from which the message was delivered */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) @@ -376,8 +364,8 @@ public class AMQChannel } /** - * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. - * May result in delivery to this same channel or to other subscribers. + * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to + * this same channel or to other subscribers. * * @throws org.apache.qpid.AMQException if the requeue fails */ @@ -386,23 +374,75 @@ public class AMQChannel // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); + TransactionalContext nontransacted = null; + if (!(_txnContext instanceof NonTransactionalContext)) + { + nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + for (UnacknowledgedMessage unacked : messagesToBeDelivered) { if (unacked.queue != null) { - _txnContext.deliver(unacked.message, unacked.queue); + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + if (!(_txnContext instanceof NonTransactionalContext)) + { + nontransacted.deliver(unacked.message, unacked.queue, false); + } + else + { + _txnContext.deliver(unacked.message, unacked.queue, false); + } } } } + public void requeue(long deliveryTag) throws AMQException + { + UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag); - /** - * Called to resend all outstanding unacknowledged messages to this same channel. - */ + if (unacked != null) + { + TransactionalContext nontransacted = null; + if (!(_txnContext instanceof NonTransactionalContext)) + { + nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + if (!(_txnContext instanceof NonTransactionalContext)) + { + nontransacted.deliver(unacked.message, unacked.queue, false); + } + else + { + _txnContext.deliver(unacked.message, unacked.queue, false); + } + unacked.message.decrementReference(_storeContext); + } + else + { + _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists"); + } + + + } + + + /** Called to resend all outstanding unacknowledged messages to this same channel. */ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException { final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); + final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>(); + + if (_log.isInfoEnabled()) + { + _log.info("unacked map contains " + _unacknowledgedMessageMap.size()); + } _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { @@ -412,21 +452,40 @@ public class AMQChannel AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); - if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended()) + if (consumerTag != null) { - msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + // Consumer exists + if (_consumerTag2QueueMap.containsKey(consumerTag)) + { + msgToResend.add(message); + } + else // consumer has gone + { + msgToRequeue.add(message); + } } else { // Message has no consumer tag, so was "delivered" to a GET // or consumer no longer registered // cannot resend, so re-queue. - if (message.queue != null && (consumerTag == null || requeue)) + if (message.queue != null) + { + if (requeue) + { + msgToRequeue.add(message); + } + else + { + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); + } + } + else { - msgToRequeue.add(message); + _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); } } - + // false means continue processing return false; } @@ -436,21 +495,112 @@ public class AMQChannel } }); - for(UnacknowledgedMessage message : msgToRequeue) + // Process Messages to Resend + if (_log.isInfoEnabled()) + { + if (!msgToResend.isEmpty()) + { + _log.info("Preparing (" + msgToResend.size() + ") message to resend to."); + } + } + for (UnacknowledgedMessage message : msgToResend) + { + AMQMessage msg = message.message; + + // Our Java Client will always suspend the channel when resending!! +// if (isSuspended()) +// { +// _log.info("Channel is suspended so requeuing"); +// //move this message to requeue +// msgToRequeue.add(message); +// } +// else + { + //release to allow it to be delivered + msg.release(); + + // Without any details from the client about what has been processed we have to mark + // all messages in the unacked map as redelivered. + msg.setRedelivered(true); + + + Subscription sub = msg.getDeliveredSubscription(); + + if (sub != null) + { + synchronized (sub.getSendLock()) + { + if (sub.isClosed()) + { + _log.info("Subscription closed during resend so requeuing message"); + //move this message to requeue + msgToRequeue.add(message); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend"); + } + // Will throw an exception if the sub is closed + sub.addToResendQueue(msg); + _unacknowledgedMessageMap.remove(message.deliveryTag); + // Don't decrement as we are bypassing the normal deliver which increments + // this is what there is a decrement on the Requeue as deliver will increment. + // msg.decrementReference(_storeContext); + } + } + } + else + { + _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); + //move this message to requeue + msgToRequeue.add(message); + } + } + } + + if (_log.isInfoEnabled()) + { + if (!msgToRequeue.isEmpty()) + { + _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to."); + } + } + + TransactionalContext nontransacted = null; + if (!(_txnContext instanceof NonTransactionalContext)) { - _txnContext.deliver(message.message, message.queue); + nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + // Process Messages to Requeue at the front of the queue + for (UnacknowledgedMessage message : msgToRequeue) + { + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + if (!(_txnContext instanceof NonTransactionalContext)) + { + nontransacted.deliver(message.message, message.queue, true); + } + else + { + _txnContext.deliver(message.message, message.queue, true); + } + _unacknowledgedMessageMap.remove(message.deliveryTag); message.message.decrementReference(_storeContext); } } /** - * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged - * messages to remove the queue reference and also decrement any message reference counts, without - * actually removing the item since we may get an ack for a delivery tag that was generated from the - * deleted queue. + * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to + * remove the queue reference and also decrement any message reference counts, without actually removing the item + * since we may get an ack for a delivery tag that was generated from the deleted queue. * * @param queue the queue that has been deleted + * * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages */ public void queueDeleted(final AMQQueue queue) throws AMQException @@ -487,6 +637,7 @@ public class AMQChannel * @param deliveryTag the last delivery tag * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only * acknowledges the single message specified by the delivery tag + * * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException @@ -517,10 +668,10 @@ public class AMQChannel private void checkSuspension() { boolean suspend; - - suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark) - || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()); - + + suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark) + || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()); + setSuspended(suspend); } @@ -570,8 +721,6 @@ public class AMQChannel public void rollback() throws AMQException { _txnContext.rollback(); - - } public String toString() @@ -617,8 +766,8 @@ public class AMQChannel } else { - boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark); - if(!willSuspend) + boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark); + if (!willSuspend) { final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes(); @@ -626,7 +775,7 @@ public class AMQChannel } - if(willSuspend) + if (willSuspend) { setSuspended(true); } @@ -634,4 +783,9 @@ public class AMQChannel } } + + public TransactionalContext getTransactionalContext() + { + return _txnContext; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index f8b6babd43..fdf087fdea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -85,7 +85,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap for (UnacknowledgedMessage msg : msgs) { remove(msg.deliveryTag); - } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java new file mode 100644 index 0000000000..ed13092ded --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.log4j.Logger; + +public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody> +{ + private static final Logger _logger = Logger.getLogger(BasicRejectMethodHandler.class); + + private static BasicRejectMethodHandler _instance = new BasicRejectMethodHandler(); + + public static BasicRejectMethodHandler getInstance() + { + return _instance; + } + + private BasicRejectMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRejectBody> evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + + _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue); + + int channelId = evt.getChannelId(); + UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag); + + _logger.info("Need to reject message:" + message); +// if (evt.getMethod().requeue) +// { +// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag); +// } +// else +// { +// // session.getChannel(channelId).resend(message); +// } + + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 8ce5a0ea73..a10f44f906 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -62,6 +62,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). + // Why, are we not allowed to send messages back to client before the ok method? channel.resend(session, false); } catch (AMQException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index c60c22c4e4..aa7ea16afc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -36,21 +36,15 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.txn.TransactionalContext; -/** - * Combines the information that make up a deliverable message into a more manageable form. - */ +/** Combines the information that make up a deliverable message into a more manageable form. */ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); - /** - * Used in clustering - */ + /** Used in clustering */ private Set<Object> _tokens; - /** - * Only use in clustering - should ideally be removed? - */ + /** Only use in clustering - should ideally be removed? */ private AMQProtocolSession _publisher; private final Long _messageId; @@ -63,16 +57,14 @@ public class AMQMessage private TransactionalContext _txnContext; /** - * Flag to indicate whether message has been delivered to a - * consumer. Used in implementing return functionality for + * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for * messages published with the 'immediate' flag. */ private boolean _deliveredToConsumer; /** - * We need to keep track of whether the message was 'immediate' - * as in extreme circumstances, when the checkDelieveredToConsumer - * is called, the message may already have been received and acknowledged, - * and the body removed from the store. + * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the + * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body + * removed from the store. */ private boolean _immediate; @@ -80,11 +72,16 @@ public class AMQMessage private TransientMessageData _transientMessageData = new TransientMessageData(); + private Subscription _takenBySubcription; + public boolean isTaken() + { + return _taken.get(); + } /** - * Used to iterate through all the body frames associated with this message. Will not - * keep all the data in memory therefore is memory-efficient. + * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory + * therefore is memory-efficient. */ private class BodyFrameIterator implements Iterator<AMQDataBlock> { @@ -103,7 +100,7 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1; + return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; } catch (AMQException e) { @@ -153,7 +150,7 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1; + return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1; } catch (AMQException e) { @@ -166,7 +163,7 @@ public class AMQMessage { try { - return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index); + return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index); } catch (AMQException e) { @@ -196,12 +193,14 @@ public class AMQMessage } /** - * Used when recovering, i.e. when the message store is creating references to messages. - * In that case, the normal enqueue/routingComplete is not done since the recovery process - * is responsible for routing the messages to queues. + * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal + * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to + * queues. + * * @param messageId * @param store * @param factory + * * @throws AMQException */ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException @@ -213,8 +212,8 @@ public class AMQMessage } /** - * Used in testing only. This allows the passing of the content header immediately - * on construction. + * Used in testing only. This allows the passing of the content header immediately on construction. + * * @param messageId * @param info * @param txnContext @@ -228,14 +227,15 @@ public class AMQMessage } /** - * Used in testing only. This allows the passing of the content header and some body fragments on - * construction. + * Used in testing only. This allows the passing of the content header and some body fragments on construction. + * * @param messageId * @param info * @param txnContext * @param contentHeader * @param destinationQueues * @param contentBodies + * * @throws AMQException */ public AMQMessage(Long messageId, MessagePublishInfo info, @@ -280,7 +280,7 @@ public class AMQMessage } else { - return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId); + return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId); } } @@ -338,16 +338,14 @@ public class AMQMessage return _messageId; } - /** - * Threadsafe. Increment the reference count on the message. - */ + /** Threadsafe. Increment the reference count on the message. */ public void incrementReference() { _referenceCount.incrementAndGet(); if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); } } @@ -355,7 +353,7 @@ public class AMQMessage /** * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. - * + * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ @@ -371,7 +369,7 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); } @@ -394,13 +392,13 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); if (_referenceCount.get() < 0) { Thread.dumpStack(); } } - if(_referenceCount.get()<0) + if (_referenceCount.get() < 0) { throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0."); } @@ -419,7 +417,8 @@ public class AMQMessage /** * Called selectors to determin if the message has already been sent - * @return _deliveredToConsumer + * + * @return _deliveredToConsumer */ public boolean getDeliveredToConsumer() { @@ -427,10 +426,17 @@ public class AMQMessage } - - public boolean taken() + public boolean taken(Subscription sub) { - return _taken.getAndSet(true); + if (_taken.getAndSet(true)) + { + return true; + } + else + { + _takenBySubcription = sub; + return false; + } } public void release() @@ -441,9 +447,9 @@ public class AMQMessage public boolean checkToken(Object token) { - if(_tokens==null) + if (_tokens == null) { - _tokens = new HashSet<Object>(); + _tokens = new HashSet<Object>(); } if (_tokens.contains(token)) @@ -458,11 +464,12 @@ public class AMQMessage } /** - * Registers a queue to which this message is to be delivered. This is - * called from the exchange when it is routing the message. This will be called before any content bodies have - * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria. + * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing + * the message. This will be called before any content bodies have been received so that the choice of + * AMQMessageHandle implementation can be picked based on various criteria. * * @param queue the queue + * * @throws org.apache.qpid.AMQException if there is an error enqueuing the message */ public void enqueue(AMQQueue queue) throws AMQException @@ -483,16 +490,15 @@ public class AMQMessage } else { - return _messageHandle.isPersistent(getStoreContext(),_messageId); + return _messageHandle.isPersistent(getStoreContext(), _messageId); } } /** * Called to enforce the 'immediate' flag. * - * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer + * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered + * to a consumer */ public void checkDeliveredToConsumer() throws NoConsumersException, AMQException { @@ -500,7 +506,7 @@ public class AMQMessage if (_immediate && !_deliveredToConsumer) { throw new NoConsumersException(this); - } + } } public MessagePublishInfo getMessagePublishInfo() throws AMQException @@ -512,7 +518,7 @@ public class AMQMessage } else { - pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId); + pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId); } return pb; } @@ -533,10 +539,7 @@ public class AMQMessage } - /** - * Called when this message is delivered to a consumer. (used to - * implement the 'immediate' flag functionality). - */ + /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */ public void setDeliveredToConsumer() { _deliveredToConsumer = true; @@ -566,7 +569,7 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { - _txnContext.deliver(this, q); + _txnContext.deliver(this, q, true); } } finally @@ -583,23 +586,22 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId); - if(bodyCount == 0) + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); + contentHeader); protocolSession.writeFrame(compositeBlock); } else { - // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0); + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; @@ -609,9 +611,9 @@ public class AMQMessage // // Now start writing out the other content bodies // - for(int i = 1; i < bodyCount; i++) + for (int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i); + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } @@ -627,22 +629,21 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId); - if(bodyCount == 0) + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId); + if (bodyCount == 0) { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); + contentHeader); protocolSession.writeFrame(compositeBlock); } else { - // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0); + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0); AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; @@ -652,9 +653,9 @@ public class AMQMessage // // Now start writing out the other content bodies // - for(int i = 1; i < bodyCount; i++) + for (int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i); + cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i); protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } @@ -685,10 +686,10 @@ public class AMQMessage AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), protocolSession.getProtocolMinorVersion(), - deliveryTag, pb.getExchange(), - queueSize, - _messageHandle.isRedelivered(), - pb.getRoutingKey()); + deliveryTag, pb.getExchange(), + queueSize, + _messageHandle.isRedelivered(), + pb.getRoutingKey()); ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? getOkFrame.writePayload(buf); buf.flip(); @@ -699,7 +700,7 @@ public class AMQMessage { AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), + protocolSession.getProtocolMinorVersion(), getMessagePublishInfo().getExchange(), replyCode, replyText, getMessagePublishInfo().getRoutingKey()); @@ -757,12 +758,11 @@ public class AMQMessage } catch (AMQException e) { - _log.error(e.toString(),e); + _log.error(e.toString(), e); return 0; } - } - + } public void restoreTransientMessageData() throws AMQException @@ -771,7 +771,7 @@ public class AMQMessage transientMessageData.setMessagePublishInfo(getMessagePublishInfo()); transientMessageData.setContentHeaderBody(getContentHeaderBody()); transientMessageData.addBodyLength(getContentHeaderBody().getSize()); - _transientMessageData = transientMessageData; + _transientMessageData = transientMessageData; } @@ -784,6 +784,11 @@ public class AMQMessage public String toString() { return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + - _taken; + _taken + " by:" + _takenBySubcription; + } + + public Subscription getDeliveredSubscription() + { + return _takenBySubcription; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index e9ebe6c541..429829e201 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -45,13 +45,11 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; /** - * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like - * that. It is described fully in RFC 006. + * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described + * fully in RFC 006. */ public class AMQQueue implements Managable, Comparable { - - public static final class ExistingExclusiveSubscription extends AMQException { @@ -74,26 +72,19 @@ public class AMQQueue implements Managable, Comparable private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive(); - private static final Logger _logger = Logger.getLogger(AMQQueue.class); private final AMQShortString _name; - /** - * null means shared - */ + /** null means shared */ private final AMQShortString _owner; private final boolean _durable; - /** - * If true, this queue is deleted when the last subscriber is removed - */ + /** If true, this queue is deleted when the last subscriber is removed */ private final boolean _autoDelete; - /** - * Holds subscribers to the queue. - */ + /** Holds subscribers to the queue. */ private final SubscriptionSet _subscribers; private final SubscriptionFactory _subscriptionFactory; @@ -106,20 +97,13 @@ public class AMQQueue implements Managable, Comparable private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); - /** - * Manages message delivery. - */ + /** Manages message delivery. */ private final DeliveryManager _deliveryMgr; - /** - * Used to track bindings to exchanges so that on deletion they can easily - * be cancelled. - */ + /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ private final ExchangeBindings _bindings = new ExchangeBindings(this); - /** - * Executor on which asynchronous delivery will be carriedout where required - */ + /** Executor on which asynchronous delivery will be carriedout where required */ private final Executor _asyncDelivery; private final AMQQueueMBean _managedObject; @@ -127,39 +111,27 @@ public class AMQQueue implements Managable, Comparable private final VirtualHost _virtualHost; - /** - * max allowed size(KB) of a single message - */ + /** max allowed size(KB) of a single message */ @Configured(path = "maximumMessageSize", defaultValue = "0") public long _maximumMessageSize; - /** - * max allowed number of messages on a queue. - */ + /** max allowed number of messages on a queue. */ @Configured(path = "maximumMessageCount", defaultValue = "0") public int _maximumMessageCount; - /** - * max queue depth for the queue - */ + /** max queue depth for the queue */ @Configured(path = "maximumQueueDepth", defaultValue = "0") public long _maximumQueueDepth; - /** - * maximum message age before alerts occur - */ + /** maximum message age before alerts occur */ @Configured(path = "maximumMessageAge", defaultValue = "0") public long _maximumMessageAge; - /** - * the minimum interval between sending out consequetive alerts of the same type - */ + /** the minimum interval between sending out consequetive alerts of the same type */ @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") public long _minimumAlertRepeatGap; - /** - * total messages received by the queue since startup. - */ + /** total messages received by the queue since startup. */ public AtomicLong _totalMessagesReceived = new AtomicLong(); public int compareTo(Object o) @@ -176,7 +148,6 @@ public class AMQQueue implements Managable, Comparable } - protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost, SubscriptionSet subscribers) @@ -211,7 +182,7 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); } private AMQQueueMBean createMBean() throws AMQException @@ -251,17 +222,13 @@ public class AMQQueue implements Managable, Comparable return _autoDelete; } - /** - * @return no of messages(undelivered) on the queue. - */ + /** @return no of messages(undelivered) on the queue. */ public int getMessageCount() { return _deliveryMgr.getQueueMessageCount(); } - /** - * @return List of messages(undelivered) on the queue. - */ + /** @return List of messages(undelivered) on the queue. */ public List<AMQMessage> getMessagesOnTheQueue() { return _deliveryMgr.getMessages(); @@ -275,6 +242,7 @@ public class AMQQueue implements Managable, Comparable /** * @param messageId + * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) @@ -294,13 +262,12 @@ public class AMQQueue implements Managable, Comparable } /** - * moves messages from this queue to another queue. to do this the approach is following- - * - setup the queue for moving messages (hold the lock and stop the async delivery) - * - get all the messages available in the given message id range - * - setup the other queue for moving messages (hold the lock and stop the async delivery) - * - send these available messages to the other queue (enqueue in other queue) - * - Once sending to other Queue is successful, remove messages from this queue - * - remove locks from both queues and start async delivery + * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for + * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message + * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these + * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful, + * remove messages from this queue - remove locks from both queues and start async delivery + * * @param fromMessageId * @param toMessageId * @param queueName @@ -316,7 +283,7 @@ public class AMQQueue implements Managable, Comparable startMovingMessages(); List<AMQMessage> list = getMessagesOnTheQueue(); List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); - int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1); + int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1); // Run this loop till you find all the messages or the list has no more messages for (AMQMessage message : list) @@ -344,7 +311,7 @@ public class AMQQueue implements Managable, Comparable { // remove the lock and start the async delivery anotherQueue.stopMovingMessages(); - stopMovingMessages(); + stopMovingMessages(); } } @@ -364,10 +331,8 @@ public class AMQQueue implements Managable, Comparable _deliveryMgr.stopMovingMessages(); _deliveryMgr.processAsync(_asyncDelivery); } - - /** - * @return MBean object associated with this Queue - */ + + /** @return MBean object associated with this Queue */ public ManagedObject getManagedObject() { return _managedObject; @@ -422,20 +387,16 @@ public class AMQQueue implements Managable, Comparable public long getOldestMessageArrivalTime() { return _deliveryMgr.getOldestMessageArrival(); - + } - /** - * Removes the AMQMessage from the top of the queue. - */ + /** Removes the AMQMessage from the top of the queue. */ public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException { _deliveryMgr.removeAMessageFromTop(storeContext); } - /** - * removes all the messages from the queue. - */ + /** removes all the messages from the queue. */ public synchronized long clearQueue(StoreContext storeContext) throws AMQException { return _deliveryMgr.clearAllMessages(storeContext); @@ -443,10 +404,10 @@ public class AMQQueue implements Managable, Comparable public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException { - exchange.registerQueue(routingKey, this, arguments); - if(isDurable() && exchange.isDurable()) + exchange.registerQueue(routingKey, this, arguments); + if (isDurable() && exchange.isDurable()) { - _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments); + _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments); } _bindings.addBinding(routingKey, arguments, exchange); } @@ -454,9 +415,9 @@ public class AMQQueue implements Managable, Comparable public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException { exchange.deregisterQueue(routingKey, this, arguments); - if(isDurable() && exchange.isDurable()) + if (isDurable() && exchange.isDurable()) { - _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments); + _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments); } _bindings.remove(routingKey, arguments, exchange); } @@ -466,30 +427,31 @@ public class AMQQueue implements Managable, Comparable FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException { - if(incrementSubscriberCount() > 1) + if (incrementSubscriberCount() > 1) { - if(isExclusive()) + if (isExclusive()) { decrementSubscriberCount(); throw EXISTING_EXCLUSIVE; } - else if(exclusive) + else if (exclusive) { decrementSubscriberCount(); throw EXISTING_SUBSCRIPTION; } } - else if(exclusive) + else if (exclusive) { setExclusive(true); } debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); + Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, + filters, noLocal, this); - if(subscription.hasFilters()) + if (subscription.hasFilters()) { if (_deliveryMgr.hasQueuedMessages()) { @@ -537,10 +499,10 @@ public class AMQQueue implements Managable, Comparable " and protocol session key " + ps.getKey() + " not registered with queue " + this); } + removedSubscription.close(); setExclusive(false); decrementSubscriberCount(); - // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { @@ -583,13 +545,13 @@ public class AMQQueue implements Managable, Comparable public void delete() throws AMQException { - if(!_deleted.getAndSet(true)) + if (!_deleted.getAndSet(true)) { _subscribers.queueDeleted(this); _bindings.deregister(); _virtualHost.getQueueRegistry().unregisterQueue(_name); _managedObject.unregister(); - for(Task task : _deleteTaskList) + for (Task task : _deleteTaskList) { task.doTask(this); } @@ -605,7 +567,8 @@ public class AMQQueue implements Managable, Comparable public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException { - _deliveryMgr.deliver(storeContext, getName(), msg); + //fixme not sure what this is doing. should we be passing deliverFirst through here? + _deliveryMgr.deliver(storeContext, getName(), msg, false); try { msg.checkDeliveredToConsumer(); @@ -620,9 +583,9 @@ public class AMQQueue implements Managable, Comparable } - public void process(StoreContext storeContext, AMQMessage msg) throws AMQException + public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { - _deliveryMgr.deliver(storeContext, getName(), msg); + _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -731,7 +694,7 @@ public class AMQQueue implements Managable, Comparable public static interface Task { - public void doTask(AMQQueue queue) throws AMQException; + public void doTask(AMQQueue queue) throws AMQException; } public void addQueueDeleteTask(Task task) @@ -759,4 +722,8 @@ public class AMQQueue implements Managable, Comparable _maximumMessageAge = maximumMessageAge; } + public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg) + { + _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 0fc8753a87..208a59516c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -24,9 +24,14 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Queue; +import java.util.Set; +import java.util.Collections; +import java.util.HashSet; import java.util.concurrent.Executor; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -38,12 +43,12 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.util.MessageQueue; +import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; -/** - * Manages delivery of messages on behalf of a queue - */ +/** Manages delivery of messages on behalf of a queue */ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class); @@ -51,47 +56,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager @Configured(path = "advanced.compressBufferOnQueue", defaultValue = "false") public boolean compressBufferOnQueue; - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); - - private final ReentrantLock _messageAccessLock = new ReentrantLock(); + /** Holds any queued messages */ + private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); - //private int _messageCount; - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ + /** Ensures that only one asynchronous task is running for this manager at any time. */ private final AtomicBoolean _processing = new AtomicBoolean(); - /** - * The subscriptions on the queue to whom messages are delivered - */ + /** The subscriptions on the queue to whom messages are delivered */ private final SubscriptionManager _subscriptions; /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. + * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles + * acknowledgements a handle on the queue. */ private final AMQQueue _queue; /** - * Flag used while moving messages from this queue to another. For moving messages the async delivery - * should also stop. This flat should be set to true to stop async delivery and set to false to enable - * async delivery again. + * Flag used while moving messages from this queue to another. For moving messages the async delivery should also + * stop. This flat should be set to true to stop async delivery and set to false to enable async delivery again. */ private AtomicBoolean _movingMessages = new AtomicBoolean(); - + /** * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced - * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered - * via the async thread. - * <p/> - * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. + * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be + * delivered via the async thread. <p/> Lock is used to control access to hasQueuedMessages() and over the addition + * of messages to the queue. */ private ReentrantLock _lock = new ReentrantLock(); private AtomicLong _totalMessageSize = new AtomicLong(); - + private AtomicInteger _extraMessages = new AtomicInteger(); + private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>()); ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -109,7 +103,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } - private boolean addMessageToQueue(AMQMessage msg) + private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst) { // Shrink the ContentBodies to their actual size to save memory. if (compressBufferOnQueue) @@ -122,7 +116,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - _messages.offer(msg); + if (deliverFirst) + { + _messages.pushHead(msg); + } + else + { + _messages.offer(msg); + } _totalMessageSize.addAndGet(msg.getSize()); @@ -135,7 +136,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); try { - return !_messages.isEmpty(); + return !(_messages.isEmpty() && _hasContent.isEmpty()); } finally { @@ -149,18 +150,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. - * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. + * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine + * size. The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. * * @return int the number of messages in the delivery queue. */ private int getMessageCount() { - return _messages.size(); + return _messages.size() + _extraMessages.get(); } - public long getTotalMessageSize() { return _totalMessageSize.get(); @@ -172,6 +172,38 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); } + public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg) + { + _lock.lock(); + try + { + if (hasContent) + { + _log.debug("Queue has adding subscriber content"); + _hasContent.add(subscription); + _totalMessageSize.addAndGet(msg.getSize()); + _extraMessages.addAndGet(1); + } + else + { + _log.debug("Queue has removing subscriber content"); + if (msg == null) + { + _hasContent.remove(subscription); + } + else + { + _totalMessageSize.addAndGet(-msg.getSize()); + _extraMessages.addAndGet(-1); + } + } + } + finally + { + _lock.unlock(); + } + } + public List<AMQMessage> getMessages() { @@ -195,7 +227,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = currentQueue.next(); if (subscription.hasInterest(message)) { - subscription.enqueueForPreDelivery(message); + subscription.enqueueForPreDelivery(message, false); } } } @@ -203,7 +235,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException { AMQMessage msg = getNextMessage(); - if(msg == null) + if (msg == null) { return false; } @@ -229,7 +261,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } _queue.dequeue(channel.getStoreContext(), msg); } - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -252,8 +284,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, - * so that the asyn delivery is also stopped. + * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, so that + * the asyn delivery is also stopped. */ public void startMovingMessages() { @@ -262,8 +294,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, - * so that the async delivery can start again. + * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, so that + * the async delivery can start again. */ public void stopMovingMessages() { @@ -276,6 +308,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * Messages will be removed from this queue and all preDeliveryQueues + * * @param messageList */ public void removeMovedMessages(List<AMQMessage> messageList) @@ -308,7 +341,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * Now with implementation of predelivery queues, this method will mark the message on the top as taken. + * * @param storeContext + * * @throws AMQException */ public void removeAMessageFromTop(StoreContext storeContext) throws AMQException @@ -318,11 +353,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (msg != null) { // mark this message as taken and get it removed - msg.taken(); + msg.taken(null); _queue.dequeue(storeContext, msg); getNextMessage(); } - + _lock.unlock(); } @@ -335,7 +370,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (msg != null) { //mark this message as taken and get it removed - msg.taken(); + msg.taken(null); _queue.dequeue(storeContext, msg); msg = getNextMessage(); count++; @@ -347,20 +382,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public synchronized AMQMessage getNextMessage() throws AMQException { - return getNextMessage(_messages); + return getNextMessage(_messages, null); } - - private AMQMessage getNextMessage(Queue<AMQMessage> messages) - { - return getNextMessage(messages, false); - } - - private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing) + private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) { AMQMessage message = messages.peek(); - while (message != null && (browsing || message.taken())) + + while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub))) { //remove the already taken message messages.poll(); @@ -371,27 +401,76 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } - public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue) + public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue) { + + Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages); + + if (_log.isTraceEnabled()) + { + _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) + + ") from queue (" + System.identityHashCode(messageQueue) + + ") AMQQueue (" + System.identityHashCode(queue) + ")"); + } + + if (messageQueue == null) + { + // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector + if (_log.isDebugEnabled()) + { + _log.debug(sub + ": asked to send messages but has none on given queue:" + queue); + } + return; + } + AMQMessage message = null; try { - message = getNextMessage(messageQueue, sub.isBrowser()); + message = getNextMessage(messageQueue, sub); // message will be null if we have no messages in the messageQueue. if (message == null) { + if (_log.isTraceEnabled()) + { + _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + } return; } if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message:" + message + " to :" + sub); + _log.debug("Async Delivery Message (" + System.identityHashCode(message) + + ") by :" + System.identityHashCode(this) + + ") to :" + System.identityHashCode(sub)); } sub.send(message, _queue); //remove sent message from our queue. messageQueue.poll(); + //If we don't remove the message from _messages + // Otherwise the Async send will never end + + if (messageQueue == sub.getResendQueue()) + { + if (_log.isTraceEnabled()) + { + _log.trace("All messages sent from resendQueue for " + sub); + } + if (messageQueue.isEmpty()) + { + subscriberHasPendingResend(false, sub, null); + //better to use the above method as this keeps all the tracking in one location. +// _hasContent.remove(sub); + } + + _extraMessages.decrementAndGet(); + } + else if (messageQueue == sub.getPreDeliveryQueue()) + { + _log.info("We could do clean up of the main _message queue here"); + } + _totalMessageSize.addAndGet(-message.getSize()); } catch (AMQException e) @@ -403,6 +482,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * enqueues the messages in the list on the queue and all required predelivery queues + * * @param storeContext * @param movedMessageList */ @@ -411,7 +491,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); for (AMQMessage msg : movedMessageList) { - addMessageToQueue(msg); + addMessageToQueue(msg, true); } // enqueue on the pre delivery queues @@ -422,7 +502,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Only give the message to those that want them. if (sub.hasInterest(msg)) { - sub.enqueueForPreDelivery(msg); + sub.enqueueForPreDelivery(msg, true); } } } @@ -430,8 +510,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). + * Only one thread should ever execute this method concurrently, but it can do so while other threads invoke + * deliver(). */ private void processQueue() { @@ -444,40 +524,43 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager for (Subscription sub : _subscriptions.getSubscriptions()) { - if (!sub.isSuspended()) + synchronized (sub.getSendLock()) { - sendNextMessage(sub); - - hasSubscribers = true; - } - } - } - } + if (!sub.isSuspended()) + { + sendNextMessage(sub, _queue); - private void sendNextMessage(Subscription sub) - { - if (sub.hasFilters()) - { - sendNextMessage(sub, sub.getPreDeliveryQueue()); - if (sub.isAutoClose()) - { - if (sub.getPreDeliveryQueue().isEmpty()) - { - sub.close(); + hasSubscribers = true; + } } } } - else - { - sendNextMessage(sub, _messages); - } } - public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException +// private void sendNextMessage(Subscription sub) +// { +// if (sub.hasFilters()) +// { +// sendNextMessage(sub, sub.getPreDeliveryQueue()); +// if (sub.isAutoClose()) +// { +// if (sub.getPreDeliveryQueue().isEmpty()) +// { +// sub.close(); +// } +// } +// } +// else +// { +// sendNextMessage(sub, _messages); +// } +// } + + public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException { if (_log.isDebugEnabled()) { - _log.debug(id() + "deliver :" + msg); + _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg); } msg.release(); @@ -491,11 +574,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery"); + _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); } if (!msg.getMessagePublishInfo().isImmediate()) { - addMessageToQueue(msg); + addMessageToQueue(msg, deliverFirst); //release lock now message is on queue. _lock.unlock(); @@ -504,7 +587,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + - " subscribers to give the message to."); + " subscribers to give the message to:" + currentStatus()); } for (Subscription sub : _subscriptions.getSubscriptions()) { @@ -528,7 +611,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } - sub.enqueueForPreDelivery(msg); + sub.enqueueForPreDelivery(msg, deliverFirst); } } } @@ -537,14 +620,47 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { //release lock now _lock.unlock(); - - if (_log.isDebugEnabled()) + synchronized (s.getSendLock()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + - System.identityHashCode(s) + ") :" + s); + if (!s.isSuspended()) + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + System.identityHashCode(s) + ") :" + s); + } + msg.taken(s); + //Deliver the message + s.send(msg, _queue); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send"); + } + } + + if (!msg.isTaken()) + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" + + " Subscriber:" + System.identityHashCode(s)); + } + + deliver(context, name, msg, deliverFirst); + } + else + { + if (_log.isDebugEnabled()) + { + _log.debug(id() + " Message(" + System.identityHashCode(msg) + + ") has been taken so disregarding deliver request to Subscriber:" + + System.identityHashCode(s)); + } + } } - //Deliver the message - s.send(msg, _queue); } } finally @@ -593,9 +709,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); + _log.debug("Processing Async." + currentStatus()); } if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) @@ -608,4 +722,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + private String currentStatus() + { + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + + "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " + + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + + " Active:" + _subscriptions.hasActiveSubscribers() + + " Processing:" + _processing.get() + + " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + + "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") "; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 27abca012b..5b77951dfd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -32,8 +32,8 @@ import org.apache.qpid.server.store.StoreContext; interface DeliveryManager { /** - * Determines whether there are queued messages. Sets _queueing to false if - * there are no queued messages. This needs to be atomic. + * Determines whether there are queued messages. Sets _queueing to false if there are no queued messages. This needs + * to be atomic. * * @return true if there are queued messages */ @@ -43,34 +43,34 @@ interface DeliveryManager * This method should not be used to determin if there are messages in the queue. * * @return int The number of messages in the queue + * * @use hasQueuedMessages() for all controls relating to having messages on the queue. */ int getQueueMessageCount(); /** - * Requests that the delivery manager start processing the queue asynchronously - * if there is work that can be done (i.e. there are messages queued up and - * subscribers that can receive them. - * <p/> - * This should be called when subscribers are added, but only after the consume-ok - * message has been returned as message delivery may start immediately. It should also - * be called after unsuspending a client. - * <p/> + * Requests that the delivery manager start processing the queue asynchronously if there is work that can be done + * (i.e. there are messages queued up and subscribers that can receive them. <p/> This should be called when + * subscribers are added, but only after the consume-ok message has been returned as message delivery may start + * immediately. It should also be called after unsuspending a client. <p/> * * @param executor the executor on which the delivery should take place */ void processAsync(Executor executor); /** - * Handles message delivery. The delivery manager is always in one of two modes; - * it is either queueing messages for asynchronous delivery or delivering - * directly. + * Handles message delivery. The delivery manager is always in one of two modes; it is either queueing messages for + * asynchronous delivery or delivering directly. + * + * @param storeContext + * @param name the name of the entity on whose behalf we are delivering the message + * @param msg the message to deliver + * @param deliverFirst * - * @param name the name of the entity on whose behalf we are delivering the message - * @param msg the message to deliver - * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued + * @throws org.apache.qpid.server.queue.FailedDequeueException + * if the message could not be dequeued */ - void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException; + void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException; void removeAMessageFromTop(StoreContext storeContext) throws AMQException; @@ -93,4 +93,6 @@ interface DeliveryManager long getTotalMessageSize(); long getOldestMessageArrival(); + + void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index fa70c6dbac..e9f209839a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -38,13 +38,23 @@ public interface Subscription Queue<AMQMessage> getPreDeliveryQueue(); - void enqueueForPreDelivery(AMQMessage msg); + Queue<AMQMessage> getResendQueue(); + + Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages); + + void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst); boolean isAutoClose(); void close(); + boolean isClosed(); + boolean isBrowser(); boolean wouldSuspend(AMQMessage msg); + + void addToResendQueue(AMQMessage msg); + + Object getSendLock(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index 6902788fc8..917f7c4e97 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -26,16 +26,16 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.protocol.AMQProtocolSession; /** - * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This - * factory primarily assists testing although in future more sophisticated subscribers may need a different - * subscription implementation. + * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory + * primarily assists testing although in future more sophisticated subscribers may need a different subscription + * implementation. * * @see org.apache.qpid.server.queue.AMQQueue */ public interface SubscriptionFactory { Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, - FieldTable filters, boolean noLocal) throws AMQException; + FieldTable filters, boolean noLocal, AMQQueue queue) throws AMQException; Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 6bdfeccc0f..ede7731a06 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -21,10 +21,10 @@ package org.apache.qpid.server.queue; import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQChannelException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; @@ -37,6 +37,8 @@ import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; +import org.apache.qpid.util.MessageQueue; +import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; /** * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag @@ -52,9 +54,11 @@ public class SubscriptionImpl implements Subscription public final AMQShortString consumerTag; - private final Object sessionKey; + private final Object _sessionKey; - private Queue<AMQMessage> _messages; + private MessageQueue<AMQMessage> _messages; + + private Queue<AMQMessage> _resendQueue; private final boolean _noLocal; @@ -63,20 +67,27 @@ public class SubscriptionImpl implements Subscription private FilterManager _filters; private final boolean _isBrowser; private final Boolean _autoClose; - private boolean _closed = false; + private boolean _sentClose = false; + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + private AMQQueue _queue; + private final AtomicBoolean _sendLock = new AtomicBoolean(false); + + public static class Factory implements SubscriptionFactory { - public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, + AMQShortString consumerTag, boolean acks, FieldTable filters, + boolean noLocal, AMQQueue queue) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, queue); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, null); } } @@ -84,25 +95,27 @@ public class SubscriptionImpl implements Subscription AMQShortString consumerTag, boolean acks) throws AMQException { - this(channelId, protocolSession, consumerTag, acks, null, false); + this(channelId, protocolSession, consumerTag, acks, null, false, null); } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) + AMQShortString consumerTag, boolean acks, FieldTable filters, + boolean noLocal, AMQQueue queue) throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) - { + { throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); } this.channel = channel; this.protocolSession = protocolSession; this.consumerTag = consumerTag; - sessionKey = protocolSession.getKey(); + _sessionKey = protocolSession.getKey(); _acks = acks; _noLocal = noLocal; + _queue = queue; _filters = FilterManagerFactory.createManager(filters); @@ -145,9 +158,7 @@ public class SubscriptionImpl implements Subscription if (_filters != null) { - _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); - - + _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); } else { @@ -169,30 +180,47 @@ public class SubscriptionImpl implements Subscription return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o); } - /** Equality holds if the session matches and the channel and consumer tag are the same. */ + /** + * Equality holds if the session matches and the channel and consumer tag are the same. + * + * @param psc The subscriptionImpl to compare + * + * @return equality + */ private boolean equals(SubscriptionImpl psc) { - return sessionKey.equals(psc.sessionKey) + return _sessionKey.equals(psc._sessionKey) && psc.channel == channel && psc.consumerTag.equals(consumerTag); } public int hashCode() { - return sessionKey.hashCode(); + return _sessionKey.hashCode(); } public String toString() { - return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; + String subscriber = "[channel=" + channel + + ", consumerTag=" + consumerTag + + ", session=" + protocolSession.getKey() + + ", resendQueue=" + (_resendQueue != null); + + if (_resendQueue != null) + { + subscriber += ", resendSize=" + _resendQueue.size(); + } + + + return subscriber + "]"; } /** * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * - * @param msg - * @param queue + * @param msg The message to send + * @param queue the Queue it has been sent from * * @throws AMQException */ @@ -278,7 +306,18 @@ public class SubscriptionImpl implements Subscription public boolean isSuspended() { - return channel.isSuspended(); + if (_logger.isTraceEnabled()) + { + if (channel.isSuspended()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended"); + } + if (_sendLock.get()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing."); + } + } + return channel.isSuspended() || _sendLock.get(); } /** @@ -376,11 +415,18 @@ public class SubscriptionImpl implements Subscription return _messages; } - public void enqueueForPreDelivery(AMQMessage msg) + public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) { if (_messages != null) { - _messages.offer(msg); + if (deliverFirst) + { + _messages.pushHead(msg); + } + else + { + _messages.offer(msg); + } } } @@ -391,19 +437,95 @@ public class SubscriptionImpl implements Subscription public void close() { - if (!_closed) + synchronized (_sendLock) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting SendLock true"); + } + + _sendLock.set(true); + + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this); + } + + if (_resendQueue != null && !_resendQueue.isEmpty()) + { + requeue(); + } + + //remove references in PDQ + if (_messages != null) + { + _messages.clear(); + } + + if (_autoClose && !_sentClose) { _logger.info("Closing autoclose subscription:" + this); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), + (byte) 8, (byte) 0, // AMQP version (major, minor) consumerTag // consumerTag )); - _closed = true; + _sentClose = true; + } + } + + private void requeue() + { + if (_queue != null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Requeuing :" + _resendQueue.size() + " messages"); + } + + while (!_resendQueue.isEmpty()) + { + AMQMessage resent = _resendQueue.poll(); + + resent.release(); + _queue.subscriberHasPendingResend(false, this, resent); + + try + { + channel.getTransactionalContext().deliver(resent, _queue, true); + } + catch (AMQException e) + { + _logger.error("Unable to re-deliver messages", e); + } + } + + if (!_resendQueue.isEmpty()) + { + _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null."); + } + + _queue.subscriberHasPendingResend(false, this, null); } + else + { + if (!_resendQueue.isEmpty()) + { + _logger.error("Unable to re-deliver messages as queue is null."); + } + } + + // Clear the messages + _resendQueue = null; + } + + + public boolean isClosed() + { + return _sendLock.get(); // This rather than _close is used to signify the subscriber is now closed. } public boolean isBrowser() @@ -416,5 +538,61 @@ public class SubscriptionImpl implements Subscription return channel.wouldSuspend(msg); } + public Queue<AMQMessage> getResendQueue() + { + if (_resendQueue == null) + { + _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + } + return _resendQueue; + } + + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + if (_resendQueue != null && !_resendQueue.isEmpty()) + { + return _resendQueue; + } + + if (_filters != null) + { + if (isAutoClose()) + { + if (_messages.isEmpty()) + { + close(); + return null; + } + } + return _messages; + } + else // we want the DM queue + { + return messages; + } + } + + public void addToResendQueue(AMQMessage msg) + { + // add to our resend queue + getResendQueue().add(msg); + + // Mark Queue has having content. + if (_queue == null) + { + _logger.error("Queue is null won't be able to resend messages"); + } + else + { + _queue.subscriberHasPendingResend(true, this, msg); + } + } + + public Object getSendLock() + { + return _sendLock; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 871f063725..26b040aae0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -27,27 +27,20 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -/** - * Holds a set of subscriptions for a queue and manages the round - * robin-ing of deliver etc. - */ +/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */ class SubscriptionSet implements WeightedSubscriptionManager { private static final Logger _log = Logger.getLogger(SubscriptionSet.class); - /** - * List of registered subscribers - */ + /** List of registered subscribers */ private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>(); - /** - * Used to control the round robin delivery of content - */ + /** Used to control the round robin delivery of content */ private int _currentSubscriber; + private final Object _subscriptionsChange = new Object(); - /** - * Accessor for unit tests. - */ + + /** Accessor for unit tests. */ int getCurrentSubscriber() { return _currentSubscriber; @@ -55,21 +48,43 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - _subscriptions.add(subscription); + synchronized (_subscriptionsChange) + { + _subscriptions.add(subscription); + } } /** * Remove the subscription, returning it if it was found * * @param subscription + * * @return null if no match was found */ public Subscription removeSubscriber(Subscription subscription) { - boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here. - if (isRemoved) + // TODO: possibly need O(1) operation here. + + Subscription sub = null; + synchronized (_subscriptionsChange) { - return subscription; + int subIndex = _subscriptions.indexOf(subscription); + + if (subIndex != -1) + { + //we can't just return the passed in subscription as it is a new object + // and doesn't contain the stored state we need. + //NOTE while this may be removed now anyone with an iterator will still have it in the list!! + sub = _subscriptions.remove(subIndex); + } + else + { + _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription); + } + } + if (sub != null) + { + return sub; } else { @@ -92,14 +107,11 @@ class SubscriptionSet implements WeightedSubscriptionManager } /** - * Return the next unsuspended subscription or null if not found. - * <p/> - * Performance note: - * This method can scan all items twice when looking for a subscription that is not - * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this - * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of - * race conditions and when subscriptions are removed between calls to nextSubscriber, the - * IndexOutOfBoundsException also causes the scan to start at the beginning. + * Return the next unsuspended subscription or null if not found. <p/> Performance note: This method can scan all + * items twice when looking for a subscription that is not suspended. The worst case occcurs when all subscriptions + * are suspended. However, it is does this without synchronisation and subscriptions may be added and removed + * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to + * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning. */ public Subscription nextSubscriber(AMQMessage msg) { @@ -156,9 +168,7 @@ class SubscriptionSet implements WeightedSubscriptionManager return null; } - /** - * Overridden in test classes. - */ + /** Overridden in test classes. */ protected void subscriberScanned() { } @@ -199,8 +209,8 @@ class SubscriptionSet implements WeightedSubscriptionManager } /** - * Notification that a queue has been deleted. This is called so that the subscription can inform the - * channel, which in turn can update its list of unacknowledged messages. + * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which + * in turn can update its list of unacknowledged messages. * * @param queue */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 29efdd9513..d12f5cd084 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -55,6 +55,7 @@ import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxSelectBody; +import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.handler.BasicAckMethodHandler; @@ -82,8 +83,9 @@ import org.apache.qpid.server.handler.QueueDeclareHandler; import org.apache.qpid.server.handler.QueueDeleteHandler; import org.apache.qpid.server.handler.QueuePurgeHandler; import org.apache.qpid.server.handler.TxCommitHandler; -import org.apache.qpid.server.handler.TxRollbackHandler; +import org.apache.qpid.server.handler.BasicRejectMethodHandler; import org.apache.qpid.server.handler.TxSelectHandler; +import org.apache.qpid.server.handler.TxRollbackHandler; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -173,6 +175,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance()); frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance()); frame2handlerMap.put(TxRollbackBody.class, TxRollbackHandler.getInstance()); + frame2handlerMap.put(BasicRejectBody.class, BasicRejectMethodHandler.getInstance()); _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap); diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java deleted file mode 100644 index 4dff514ff4..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.txn; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.StoreContext; - -/** - * @author Robert Greig (robert.j.greig@jpmorgan.com) - */ -public class DeliverMessageOperation implements TxnOp -{ - private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class); - - private final AMQMessage _msg; - - private final AMQQueue _queue; - - public DeliverMessageOperation(AMQMessage msg, AMQQueue queue) - { - _msg = msg; - _queue = queue; - _msg.incrementReference(); - } - - public void prepare(StoreContext context) throws AMQException - { - } - - public void undoPrepare() - { - } - - public void commit(StoreContext context) throws AMQException - { - //do the memeory part of the record() - _msg.incrementReference(); - //then process the message - try - { - _queue.process(context, _msg); - } - catch (AMQException e) - { - //TODO: is there anything else we can do here? I think not... - _logger.error("Error during commit of a queue delivery: " + e, e); - } - } - - public void rollback(StoreContext storeContext) - { - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 5c915b5c84..e5cce672f6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -31,9 +31,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -/** - * A transactional context that only supports local transactions. - */ +/** A transactional context that only supports local transactions. */ public class LocalTransactionalContext implements TransactionalContext { private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class); @@ -62,12 +60,14 @@ public class LocalTransactionalContext implements TransactionalContext { public AMQMessage message; public AMQQueue queue; + private boolean deliverFirst; - public DeliveryDetails(AMQMessage message, AMQQueue queue) + public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst) { this.message = message; this.queue = queue; + this.deliverFirst = deliverFirst; } } @@ -89,9 +89,10 @@ public class LocalTransactionalContext implements TransactionalContext public void rollback() throws AMQException { _txnBuffer.rollback(_storeContext); + _postCommitDeliveryList.clear(); } - public void deliver(AMQMessage message, AMQQueue queue) throws AMQException + public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException { // A publication will result in the enlisting of several // TxnOps. The first is an op that will store the message. @@ -100,7 +101,7 @@ public class LocalTransactionalContext implements TransactionalContext // enqueued. Finally a cleanup op will be added to decrement // the reference associated with the routing. message.incrementReference(); - _postCommitDeliveryList.add(new DeliveryDetails(message, queue)); + _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); _messageDelivered = true; /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue)); if (_log.isDebugEnabled()) @@ -225,7 +226,7 @@ public class LocalTransactionalContext implements TransactionalContext { for (DeliveryDetails dd : _postCommitDeliveryList) { - dd.queue.process(_storeContext, dd.message); + dd.queue.process(_storeContext, dd.message, dd.deliverFirst); } } finally diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index c7f3a0f0f1..19146da22e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -34,21 +34,15 @@ import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -/** - * @author Apache Software Foundation - */ +/** @author Apache Software Foundation */ public class NonTransactionalContext implements TransactionalContext { private static final Logger _log = Logger.getLogger(NonTransactionalContext.class); - /** - * Channel is useful for logging - */ + /** Channel is useful for logging */ private final AMQChannel _channel; - /** - * Where to put undeliverable messages - */ + /** Where to put undeliverable messages */ private final List<RequiredDeliveryException> _returnMessages; private Set<Long> _browsedAcks; @@ -57,9 +51,7 @@ public class NonTransactionalContext implements TransactionalContext private StoreContext _storeContext; - /** - * Whether we are in a transaction - */ + /** Whether we are in a transaction */ private boolean _inTran; public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, @@ -97,12 +89,12 @@ public class NonTransactionalContext implements TransactionalContext // Does not apply to this context } - public void deliver(AMQMessage message, AMQQueue queue) throws AMQException + public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException { try { message.incrementReference(); - queue.process(_storeContext, message); + queue.process(_storeContext, message, deliverFirst); //following check implements the functionality //required by the 'immediate' flag: message.checkDeliveredToConsumer(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java index 59d9117fda..88451e2fca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java @@ -38,7 +38,7 @@ public interface TransactionalContext void rollback() throws AMQException; - void deliver(AMQMessage message, AMQQueue queue) throws AMQException; + void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException; void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ad600ddb40..89f596e541 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -255,13 +255,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _connectionStopped; } - void setConnectionStopped(boolean connectionStopped) + boolean setConnectionStopped(boolean connectionStopped) { + boolean currently; synchronized (_lock) { + currently = _connectionStopped; _connectionStopped = connectionStopped; _lock.notify(); } + return currently; } private void dispatchMessage(UnprocessedMessage message) @@ -543,7 +546,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { -// suspendChannel(true); + suspendChannel(true); } _connection.getProtocolHandler().syncWrite( @@ -556,7 +559,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { -// suspendChannel(false); + suspendChannel(false); } } catch (AMQException e) @@ -822,10 +825,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isSuspended = isSuspended(); -// if (!isSuspended) -// { -// suspendChannel(true); -// } + if (!isSuspended) + { + suspendChannel(true); + } for (BasicMessageConsumer consumer : _consumers.values()) { @@ -841,15 +844,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false) // requeue , BasicRecoverOkBody.class); -// if (_dispatcher != null) -// { -// _dispatcher.rollback(); -// } -// -// if (!isSuspended) -// { -// suspendChannel(false); -// } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + if (!isSuspended) + { + suspendChannel(false); + } } catch (AMQException e) { @@ -1952,7 +1955,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_dispatcher == null) { rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - } + }// if the dispatcher is running we have to do the clean up in the Ok Handler. } } @@ -2171,8 +2174,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi rejectMessagesForConsumerTag(null, requeue); } - /** @param consumerTag The consumerTag to prune from queue or all if null - * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + /** + * @param consumerTag The consumerTag to prune from queue or all if null + * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) */ private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) @@ -2192,7 +2196,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messages.remove(); -// rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message.getDeliverBody().deliveryTag, requeue); _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 496e377435..e9b914425a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -745,28 +745,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); } - for (Object o : _synchronousQueue) + Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) { + Object o = iterator.next(); + if (o instanceof AbstractJMSMessage) { -// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); if (_logger.isTraceEnabled()) { _logger.trace("Rejected message" + o); + iterator.remove(); } } else { _logger.error("Queue contained a :" + o.getClass() + - " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); } } if (_synchronousQueue.size() != 0) { - _logger.warn("Queue was not empty after rejecting all messages"); + _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); } _synchronousQueue.clear(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 67d74055c6..36dd4d400c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -87,17 +87,17 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach switch (contentType) { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); - break; + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + break; } //Destination dest = AMQDestination.createDestination(url); setJMSDestination(dest); @@ -203,7 +203,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach if (!(destination instanceof AMQDestination)) { throw new IllegalArgumentException( - "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); + "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); } final AMQDestination amqd = (AMQDestination) destination; @@ -495,8 +495,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public abstract void clearBodyImpl() throws JMSException; /** - * Get a String representation of the body of the message. Used in the - * toString() method which outputs this before message properties. + * Get a String representation of the body of the message. Used in the toString() method which outputs this before + * message properties. */ public abstract String toBodyString() throws JMSException; @@ -519,7 +519,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach buf.append("\nJMS priority: ").append(getJMSPriority()); buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); + buf.append("\nJMS Redelivered: ").append(_redelivered); + buf.append("\nJMS Destination: ").append(getJMSDestination()); + buf.append("\nJMS Type: ").append(getJMSType()); + buf.append("\nJMS MessageID: ").append(getJMSMessageID()); buf.append("\nAMQ message number: ").append(_deliveryTag); + buf.append("\nProperties:"); if (getJmsHeaders().isEmpty()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 988a12ee78..d0cc52271a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -65,15 +65,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 - * mapping between connection instances and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances + * and protocol handler instances. */ private AMQConnection _connection; - /** - * Our wrapper for a protocol session that provides access to session values - * in a typesafe manner. - */ + /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ private volatile AMQProtocolSession _protocolSession; private AMQStateManager _stateManager = new AMQStateManager(); @@ -120,8 +117,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter // we only add the SSL filter where we have an SSL connection if (_connection.getSSLConfiguration() != null) { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLConfiguration sslConfig = _connection.getSSLConfiguration(); + SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); sslFilter.setUseClientMode(true); session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); @@ -139,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { e.printStackTrace(); } - + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } @@ -154,6 +151,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * sessionClosed() depending on whether we were trying to send data at the time of failure. * * @param session + * * @throws Exception */ public void sessionClosed(IoSession session) throws Exception @@ -208,9 +206,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.info("Protocol Session [" + this + "] closed"); } - /** - * See {@link FailoverHandler} to see rationale for separate thread. - */ + /** See {@link FailoverHandler} to see rationale for separate thread. */ private void startFailoverThread() { Thread failoverThread = new Thread(_failoverHandler); @@ -267,10 +263,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * There are two cases where we have other threads potentially blocking for events to be handled by this - * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a - * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can - * react appropriately. + * There are two cases where we have other threads potentially blocking for events to be handled by this class. + * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type + * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. * * @param e the exception to propagate */ @@ -306,13 +301,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - switch(bodyFrame.getFrameType()) + switch (bodyFrame.getFrameType()) { case AMQMethodBody.TYPE: if (debug) { - _logger.debug("Method frame received: " + frame); + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); } final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); @@ -362,10 +357,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); break; - + case HeartbeatBody.TYPE: - if(debug) + if (debug) { _logger.debug("Received heartbeat"); } @@ -413,8 +408,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -429,30 +424,28 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) + BlockingMethodFrameListener listener) throws AMQException { return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. */ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) + BlockingMethodFrameListener listener, long timeout) throws AMQException { try @@ -477,17 +470,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException { return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException { return writeCommandFrameAndWaitForReply(frame, @@ -495,9 +484,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method to register an AMQSession with the protocol handler. Registering - * a session with the protocol handler will ensure that messages are delivered to the - * consumer(s) on that session. + * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol + * handler will ensure that messages are delivered to the consumer(s) on that session. * * @param channelId the channel id of the session * @param session the session instance. @@ -555,17 +543,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * @return the number of bytes read from this protocol session - */ + /** @return the number of bytes read from this protocol session */ public long getReadBytes() { return _protocolSession.getIoSession().getReadBytes(); } - /** - * @return the number of bytes written to this protocol session - */ + /** @return the number of bytes written to this protocol session */ public long getWrittenBytes() { return _protocolSession.getIoSession().getWrittenBytes(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java new file mode 100644 index 0000000000..0d75a6b968 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.transacted; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +/** + * This class tests a number of commits and roll back scenarios + * + * Assumptions; - Assumes empty Queue + */ +public class CommitRollbackTest extends TestCase +{ + protected AMQConnection conn; + protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected String payload = "xyzzy"; + private Session _session; + private MessageProducer _publisher; + private Session _pubSession; + private MessageConsumer _consumer; + Queue _jmsQueue; + + private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class); + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + newConnection(); + } + + private void newConnection() throws AMQException, URLSyntaxException, JMSException + { + conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"); + + _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + + _jmsQueue = _session.createQueue(queue); + _consumer = _session.createConsumer(_jmsQueue); + + _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + + _publisher = _pubSession.createProducer(_pubSession.createQueue(queue)); + + conn.start(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + conn.close(); + TransportConnection.killVMBroker(1); + } + + /** PUT a text message, disconnect before commit, confirm it is gone. */ + public void testPutThenDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("reconnecting without commit"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + //commit to ensure message is removed from queue + _session.commit(); + + assertNull("test message was put and disconnected before commit, but is still present", result); + } + + /** PUT a text message, disconnect before commit, confirm it is gone. */ + public void testPutThenCloseDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("closing publisher without commit"); + _publisher.close(); + + _logger.info("reconnecting without commit"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + //commit to ensure message is removed from queue + _session.commit(); + + assertNull("test message was put and disconnected before commit, but is still present", result); + } + + /** + * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different + * session as producer + */ + public void testPutThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("rolling back"); + _pubSession.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + assertNull("test message was put and rolled back, but is still present", result); + } + + /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */ + public void testGetThenDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + assertNotNull("retrieved message is null", msg); + + _logger.info("closing connection"); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + _session.commit(); + + assertNotNull("test message was consumed and disconnected before commit, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + } + + /** + * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the + * same connection but different session as producer + */ + public void testGetThenCloseDisconnect() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenCloseDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("reconnecting without commit"); + _consumer.close(); + conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + + _session.commit(); + + assertNotNull("test message was consumed and disconnected before commit, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + } + + /** + * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt + * session to the producer + */ + public void testGetThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("rolling back"); + + _session.rollback(); + + _logger.info("receiving result"); + + Message result = _consumer.receive(1000); + + _session.commit(); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } + + /** + * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same + * connection but different session as producer + */ + public void testGetThenCloseRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenCloseRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(1000); + + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("Closing consumer"); + _consumer.close(); + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("receiving result"); + + _consumer = _session.createConsumer(_jmsQueue); + + Message result = _consumer.receive(1000); + + _session.commit(); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + } + + + /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */ + public void testSend2ThenRollback() throws Exception + { + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); + + _logger.info("getting test message"); + assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); + + result = _consumer.receive(1000); + + assertNull("test message should be null", result); + } + + public void testSend2ThenCloseAfter1andTryAgain() throws Exception + { +// assertTrue("session is not transacted", _session.getTransacted()); +// assertTrue("session is not transacted", _pubSession.getTransacted()); +// +// _logger.info("sending two test messages"); +// _publisher.send(_pubSession.createTextMessage("1")); +// _publisher.send(_pubSession.createTextMessage("2")); +// _pubSession.commit(); +// +// _logger.info("getting test message"); +// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); +// +// _consumer.close(); +// +// _consumer = _session.createConsumer(_jmsQueue); +// +// _logger.info("receiving result"); +// Message result = _consumer.receive(1000); +// _logger.error("1:" + result); +//// assertNotNull("test message was consumed and rolled back, but is gone", result); +//// assertEquals("1" , ((TextMessage) result).getText()); +//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// _logger.error("2" + result); +//// assertNotNull("test message was consumed and rolled back, but is gone", result); +//// assertEquals("2", ((TextMessage) result).getText()); +//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); +// +// result = _consumer.receive(1000); +// _logger.error("3" + result); +// assertNull("test message should be null:" + result, result); + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 4296e43f88..94cbb426e5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -63,12 +63,11 @@ public class TransactedTest extends TestCase super.setUp(); TransportConnection.createVMBroker(1); con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); - session = con.createSession(true, 0); + session = con.createSession(true, Session.SESSION_TRANSACTED); queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); - consumer1 = session.createConsumer(queue1); //Dummy just to create the queue. MessageConsumer consumer2 = session.createConsumer(queue2); @@ -81,7 +80,6 @@ public class TransactedTest extends TestCase prepProducer1 = prepSession.createProducer(queue1); prepCon.start(); - //add some messages prepProducer1.send(prepSession.createTextMessage("A")); prepProducer1.send(prepSession.createTextMessage("B")); @@ -127,24 +125,33 @@ public class TransactedTest extends TestCase public void testRollback() throws Exception { + _logger.info("Sending X Y Z"); producer2.send(session.createTextMessage("X")); producer2.send(session.createTextMessage("Y")); producer2.send(session.createTextMessage("Z")); + _logger.info("Receiving A B"); expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); - expect("C", consumer1.receive(1000)); + //Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it. //rollback + _logger.info("rollback"); session.rollback(); + _logger.info("Receiving A B C"); //ensure sent messages are not visible and received messages are requeued expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); expect("C", consumer1.receive(1000)); + + _logger.info("Starting new connection"); testCon.start(); testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Testing we have no messages left"); assertTrue(null == testConsumer1.receive(1000)); assertTrue(null == testConsumer2.receive(1000)); + + session.commit(); } public void testResendsMsgsAfterSessionClose() throws Exception @@ -152,7 +159,7 @@ public class TransactedTest extends TestCase AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("Q3"), false); + AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); @@ -225,8 +232,9 @@ public class TransactedTest extends TestCase private void expect(String text, Message msg) throws JMSException { - assertTrue(msg instanceof TextMessage); - assertEquals(text, ((TextMessage) msg).getText()); + assertNotNull("Message should not be null", msg); + assertTrue("Message should be a text message", msg instanceof TextMessage); + assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText()); } public static junit.framework.Test suite() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index ecabd9320a..9fa96ece1e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; /** * Represents a shared queue in a cluster. The key difference is that as well as any @@ -56,10 +55,10 @@ public class ClusteredQueue extends AMQQueue } - public void process(StoreContext storeContext, AMQMessage msg) throws AMQException + public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { _logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this)); - super.process(storeContext, msg); + super.process(storeContext, msg, deliverFirst); } protected void autodelete() throws AMQException diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index 364aea81c0..a5ace41752 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -117,7 +117,17 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage return null; } - public void enqueueForPreDelivery(AMQMessage msg) + public Queue<AMQMessage> getResendQueue() + { + return null; + } + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + return messages; + } + + public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) { //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl } @@ -132,6 +142,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage //no-op } + public boolean isClosed() + { + return false; + } + public boolean isBrowser() { return false; @@ -142,4 +157,14 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage return _suspended; } + public void addToResendQueue(AMQMessage msg) + { + //no-op + } + + public Object getSendLock() + { + return new Object(); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java new file mode 100644 index 0000000000..cdf686b4cb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.util; + +import org.apache.log4j.Logger; + +import java.util.Queue; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; + +public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E> +{ + private static final Logger _logger = Logger.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class); + + protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>(); + + protected AtomicInteger _messageHeadSize = new AtomicInteger(0); + + @Override + public int size() + { + return super.size() + _messageHeadSize.get(); + } + + @Override + public E poll() + { + if (_messageHead.isEmpty()) + { + return super.poll(); + } + else + { + _logger.debug("Providing item from message head"); + + E e = _messageHead.poll(); + + if (e != null) + { + _messageHeadSize.decrementAndGet(); + } + + return e; + } + } + + @Override + public boolean remove(Object o) + { + + if (_messageHead.isEmpty()) + { + return super.remove(o); + } + else + { + if (_messageHead.remove(o)) + { + _messageHeadSize.decrementAndGet(); + return true; + } + + return super.remove(o); + } + } + + @Override + public boolean removeAll(Collection<?> c) + { + if (_messageHead.isEmpty()) + { + return super.removeAll(c); + } + else + { + //fixme this is super.removeAll but iterator here doesn't work + // we need to be able to correctly decrement _messageHeadSize +// boolean modified = false; +// Iterator<?> e = iterator(); +// while (e.hasNext()) +// { +// if (c.contains(e.next())) +// { +// e.remove(); +// modified = true; +// _size.decrementAndGet(); +// } +// } +// return modified; + + throw new RuntimeException("Not implemented"); + } + } + + + @Override + public boolean isEmpty() + { + return (_messageHead.isEmpty() && super.isEmpty()); + } + + @Override + public void clear() + { + super.clear(); + _messageHead.clear(); + } + + @Override + public boolean contains(Object o) + { + return _messageHead.contains(o) || super.contains(o); + } + + @Override + public boolean containsAll(Collection<?> o) + { + return _messageHead.containsAll(o) || super.containsAll(o); + } + + @Override + public E element() + { + if (_messageHead.isEmpty()) + { + return super.element(); + } + else + { + return _messageHead.element(); + } + } + + @Override + public E peek() + { + if (_messageHead.isEmpty()) + { + return super.peek(); + } + else + { + _logger.debug("Providing item from message head"); + return _messageHead.peek(); + } + + } + + @Override + public Iterator<E> iterator() + { + throw new RuntimeException("Not Implemented"); + + } + + @Override + public boolean retainAll(Collection<?> c) + { + throw new RuntimeException("Not Implemented"); + } + + @Override + public Object[] toArray() + { + throw new RuntimeException("Not Implemented"); + } + + public boolean pushHead(E o) + { + _logger.debug("Adding item to head of queue"); + if (_messageHead.offer(o)) + { + _messageHeadSize.incrementAndGet(); + return true; + } + return false; + } +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java b/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java new file mode 100644 index 0000000000..9cf3319374 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.util; + +import java.util.Queue; + +public interface MessageQueue<E> extends Queue<E> +{ + + boolean pushHead(E o); + +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6beeb92053..ccd23bc0bc 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -272,9 +272,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase * not invoked. It is unnecessary since for this test we only care to know whether the message was * sent to the queue; the queue processing logic is not being tested. * @param msg + * @param deliverFirst * @throws AMQException */ - public void process(StoreContext context, AMQMessage msg) throws AMQException + public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException { messages.add(new HeadersExchangeTest.Message(msg)); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 2d0315d7f5..26332579cb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -19,7 +19,6 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -151,7 +150,7 @@ public class AMQQueueMBeanTest extends TestCase AMQMessage msg = message(false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - _queue.process(_storeContext, msg); + _queue.process(_storeContext, msg, false); msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); _queueMBean.viewMessageContent(id); try @@ -216,7 +215,7 @@ public class AMQQueueMBeanTest extends TestCase } for (int i = 0; i < messageCount; i++) { - _queue.process(_storeContext, messages[i]); + _queue.process(_storeContext, messages[i], false); } for (int i = 0; i < messages.length; i++) diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java index 6f3d42d090..4971db2d28 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -194,7 +194,7 @@ public class ConcurrencyTest extends MessageTestHelper AMQMessage msg = nextMessage(); if (msg != null) { - _deliveryMgr.deliver(null, new AMQShortString(toString()), msg); + _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); } } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index e1be640c8e..dc5a6d3cf6 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -49,7 +49,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); @@ -59,7 +59,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = batch; i < messages.length; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } assertTrue(s1.getMessages().isEmpty()); @@ -97,7 +97,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } assertEquals(batch, s1.getMessages().size()); @@ -111,7 +111,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper s1.setSuspended(true); for (int i = batch; i < messages.length; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } _mgr.processAsync(new OnCurrentThreadExecutor()); @@ -133,7 +133,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper try { AMQMessage msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } @@ -155,7 +155,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper _subscriptions.addSubscriber(s); s.setSuspended(true); AMQMessage msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index b3574ecba4..01eb2ba6a2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -67,12 +67,22 @@ public class SubscriptionTestHelper implements Subscription return isSuspended; } - public boolean wouldSuspend(AMQMessage msg) + public boolean wouldSuspend(AMQMessage msg) { return isSuspended; } - + public void addToResendQueue(AMQMessage msg) + { + //no-op + } + + public Object getSendLock() + { + return new Object(); + } + + public void queueDeleted(AMQQueue queue) { } @@ -92,7 +102,17 @@ public class SubscriptionTestHelper implements Subscription return null; } - public void enqueueForPreDelivery(AMQMessage msg) + public Queue<AMQMessage> getResendQueue() + { + return null; + } + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + return messages; + } + + public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) { //no-op } @@ -107,9 +127,14 @@ public class SubscriptionTestHelper implements Subscription //no-op } + public boolean isClosed() + { + return false; + } + public boolean isBrowser() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public int hashCode() |