diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-08 13:38:37 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-08 13:38:37 +0000 |
commit | 208e8a556d64a717c9fec9e49354d6e5c69bbaac (patch) | |
tree | 26854ab71db1188b5dad8965520f4c35130e25bd | |
parent | 3b6459da09f50e039ba79fabd5da19c3b2cab1f5 (diff) | |
download | qpid-python-208e8a556d64a717c9fec9e49354d6e5c69bbaac.tar.gz |
QPID-
Broker
AMQChannel - Resend modified to add messages to Subscription resendQueue.
BasicRecoverMethodHandler - Now makes use of the Requeue boolean (needs test case, but is same logic as TxRollback)
TxRollbackHandler - Removed protocol Session from AMQChannel.resend()
AMQMessage - Changes comments, updated taken() to record the subscription that took the message
AMQQueue - Added DeliveryManager to Subscription constructors.
ConcurrentSelectorDeliveryManager - updated to get queue from Subscription and to know when the Subscriptions have content that needs Async delivery.
DeliveryManager - added update method to allow a subscription to tell DM it has content to send.
Subscription - new methods to handle resendQueue
SubscriptionFactory - changes to pass in the DeliveryManager
SubscriptionImpl - Comment changes, Constructor changes, implmentations of interface
Client
Recover and TxRollback now perform their broker methods while suspended.
RecoverTest - Added addition asserts to prevent NPEs
CommitRollbackTest - word change
RemoteSubscriptionImpl/SubscriptionTestHelper - Subscription implementation
AckTest - Update for new SubscriptionImpl constructor
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@504887 13f79535-47bb-0310-9956-ffa450edef68
17 files changed, 357 insertions, 196 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 7fb446a579..a85db9f26b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.queue.Subscription; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; @@ -388,7 +389,7 @@ public class AMQChannel } /** Called to resend all outstanding unacknowledged messages to this same channel. */ - public void resend(AMQProtocolSession session) throws AMQException + public void resend() throws AMQException { //messages go to this channel synchronized (_unacknowledgedMessageMapLock) @@ -400,17 +401,35 @@ public class AMQChannel { Map.Entry<Long, UnacknowledgedMessage> entry = messageSetIterator.next(); - long deliveryTag = entry.getKey(); + //long deliveryTag = entry.getKey(); String consumerTag = entry.getValue().consumerTag; if (_consumerTag2QueueMap.containsKey(consumerTag)) { AMQMessage msg = entry.getValue().message; msg.setRedelivered(true); - session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); + Subscription sub = msg.getDeliveredSubscription(); + + if (sub != null) + { + if (_log.isDebugEnabled()) + { + _log.debug("Requeuing " + msg + " for resend"); + } + + sub.addToResendQueue(msg); + } + else + { + _log.error("DeliveredSubscription not recorded"); + } + + // Don't write the frame as the DeliveryManager can now deal with it + //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); } else - { + { // The current consumer has gone so we need to requeue + UnacknowledgedMessage unacked = entry.getValue(); if (unacked.queue != null) @@ -426,6 +445,8 @@ public class AMQChannel } } } + + //fixme need to start the async delivery here. } /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index 85e802d10d..f83d38ad47 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -46,12 +46,22 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<BasicRecoverBody> evt) throws AMQException { - _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId()); + _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId()); AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + if (channel == null) { throw new AMQException("Unknown channel " + evt.getChannelId()); } - channel.resend(protocolSession); + + if (evt.getMethod().getRequeue()) + { + //fixme need tests to exercise + channel.requeue(); + } + else + { + channel.resend(); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 588dc026d4..1d11f6297b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -48,17 +48,23 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<TxRollbackBody> evt) throws AMQException { - try{ + try + { AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + channel.rollback(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); + //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). - channel.resend(protocolSession); - }catch(AMQException e){ + channel.resend(); + } + catch (AMQException e) + { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 8603113c11..8ba3cc5686 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -39,9 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ConcurrentHashMap; -/** - * Combines the information that make up a deliverable message into a more manageable form. - */ +/** Combines the information that make up a deliverable message into a more manageable form. */ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -66,36 +64,29 @@ public class AMQMessage private long _arrivalTime; - /** - * Keeps a track of how many bytes we have received in body frames - */ + /** Keeps a track of how many bytes we have received in body frames */ private long _bodyLengthReceived = 0; - /** - * The message store in which this message is contained. - */ + /** The message store in which this message is contained. */ private transient final MessageStore _store; /** - * For non transactional publishes, a message can be stored as - * soon as it is complete. For transactional messages it doesnt - * need to be stored until the transaction is committed. + * For non transactional publishes, a message can be stored as soon as it is complete. For transactional messages it + * doesnt need to be stored until the transaction is committed. */ private boolean _storeWhenComplete; - /** - * TxnBuffer for transactionally published messages - */ + /** TxnBuffer for transactionally published messages */ private TxnBuffer _txnBuffer; /** - * Flag to indicate whether message has been delivered to a - * consumer. Used in implementing return functionality for + * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for * messages published with the 'immediate' flag. */ private boolean _deliveredToConsumer; private ConcurrentHashMap<String, MessageDecorator> _decodedMessages; private AtomicBoolean _taken; + private Subscription _takenBySubcription; public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody) @@ -282,9 +273,7 @@ public class AMQMessage return _messageId; } - /** - * Threadsafe. Increment the reference count on the message. - */ + /** Threadsafe. Increment the reference count on the message. */ public void incrementReference() { _referenceCount.incrementAndGet(); @@ -390,9 +379,8 @@ public class AMQMessage /** * Called to enforce the 'immediate' flag. * - * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer + * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered + * to a consumer */ public void checkDeliveredToConsumer() throws NoConsumersException { @@ -403,9 +391,8 @@ public class AMQMessage } /** - * Called when this message is delivered to a consumer. (used to - * implement the 'immediate' flag functionality). - * And by selectors to determin if the message has already been sent + * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). And + * by selectors to determin if the message has already been sent */ public void setDeliveredToConsumer() { @@ -457,13 +444,31 @@ public class AMQMessage return msgdec; } - public boolean taken() + public boolean taken(Subscription sub) { - return _taken.getAndSet(true); + if (_taken.getAndSet(true)) + { + if (sub == _takenBySubcription) + { + return false; + } + return true; + } + else + { + _takenBySubcription = sub; + return false; + } } public void release() { + _takenBySubcription = null; _taken.set(false); } + + public Subscription getDeliveredSubscription() + { + return _takenBySubcription; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index d8bacc8c7d..bcad8e7d14 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -37,8 +37,8 @@ import java.util.List; import java.util.concurrent.Executor; /** - * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like - * that. It is described fully in RFC 006. + * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described + * fully in RFC 006. */ public class AMQQueue implements Managable, Comparable { @@ -46,62 +46,41 @@ public class AMQQueue implements Managable, Comparable private final String _name; - /** - * null means shared - */ + /** null means shared */ private final String _owner; private final boolean _durable; - /** - * If true, this queue is deleted when the last subscriber is removed - */ + /** If true, this queue is deleted when the last subscriber is removed */ private final boolean _autoDelete; - /** - * Holds subscribers to the queue. - */ + /** Holds subscribers to the queue. */ private final SubscriptionSet _subscribers; private final SubscriptionFactory _subscriptionFactory; - /** - * Manages message delivery. - */ + /** Manages message delivery. */ private final DeliveryManager _deliveryMgr; - /** - * The queue registry with which this queue is registered. - */ + /** The queue registry with which this queue is registered. */ private final QueueRegistry _queueRegistry; - /** - * Used to track bindings to exchanges so that on deletion they can easily - * be cancelled. - */ + /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ private final ExchangeBindings _bindings = new ExchangeBindings(this); - /** - * Executor on which asynchronous delivery will be carriedout where required - */ + /** Executor on which asynchronous delivery will be carriedout where required */ private final Executor _asyncDelivery; private final AMQQueueMBean _managedObject; - /** - * max allowed size(KB) of a single message - */ + /** max allowed size(KB) of a single message */ private long _maximumMessageSize = 10000; - /** - * max allowed number of messages on a queue. - */ + /** max allowed number of messages on a queue. */ @Configured(path = "maximumMessageCount", defaultValue = "0") public int _maximumMessageCount; - /** - * max queue depth for the queue - */ + /** max queue depth for the queue */ @Configured(path = "maximumQueueDepth", defaultValue = "0") public long _maximumQueueDepth = 10000000; @@ -117,9 +96,7 @@ public class AMQQueue implements Managable, Comparable @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") public long _minimumAlertRepeatGap = 30000; - /** - * total messages received by the queue since startup. - */ + /** total messages received by the queue since startup. */ public long _totalMessagesReceived = 0; public int compareTo(Object o) @@ -198,7 +175,7 @@ public class AMQQueue implements Managable, Comparable _autoDelete = autoDelete; _queueRegistry = queueRegistry; _asyncDelivery = asyncDelivery; - + _managedObject = createMBean(); _managedObject.register(); @@ -244,17 +221,13 @@ public class AMQQueue implements Managable, Comparable return _autoDelete; } - /** - * @return no of messages(undelivered) on the queue. - */ + /** @return no of messages(undelivered) on the queue. */ public int getMessageCount() { return _deliveryMgr.getQueueMessageCount(); } - /** - * @return List of messages(undelivered) on the queue. - */ + /** @return List of messages(undelivered) on the queue. */ public List<AMQMessage> getMessagesOnTheQueue() { return _deliveryMgr.getMessages(); @@ -263,10 +236,11 @@ public class AMQQueue implements Managable, Comparable public long getQueueDepth() { return _deliveryMgr.getTotalMessageSize(); - } + } /** * @param messageId + * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) @@ -285,9 +259,7 @@ public class AMQQueue implements Managable, Comparable return msg; } - /** - * @return MBean object associated with this Queue - */ + /** @return MBean object associated with this Queue */ public ManagedObject getManagedObject() { return _managedObject; @@ -344,17 +316,13 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.getOldestMessageArrival(); } - /** - * Removes the AMQMessage from the top of the queue. - */ + /** Removes the AMQMessage from the top of the queue. */ public void deleteMessageFromTop() throws AMQException { _deliveryMgr.removeAMessageFromTop(); } - /** - * removes all the messages from the queue. - */ + /** removes all the messages from the queue. */ public void clearQueue() throws AMQException { _deliveryMgr.clearAllMessages(); @@ -375,7 +343,7 @@ public class AMQQueue implements Managable, Comparable { debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); + Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, _deliveryMgr); if (subscription.hasFilters()) { @@ -396,7 +364,8 @@ public class AMQQueue implements Managable, Comparable Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, - consumerTag))) + consumerTag, + _deliveryMgr))) == null) { throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 2b12b8e14c..f66604a5c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -31,10 +31,17 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Queue; +import java.util.HashMap; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; /** Manages delivery of messages on behalf of a queue */ @@ -68,6 +75,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private ReentrantLock _lock = new ReentrantLock(); private AtomicLong _totalMessageSize = new AtomicLong(); + private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>()); ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -111,7 +119,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); try { - return !_messages.isEmpty(); + return !_messages.isEmpty() || !_hasContent.isEmpty(); } finally { @@ -146,6 +154,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return msg == null ? Long.MAX_VALUE : msg.getArrivalTime(); } + public void setQueueHasContent(Subscription subscription) + { + _lock.lock(); + try + { + + _log.debug("Queue has content Set"); + _hasContent.add(subscription); + } + finally + { + _lock.unlock(); + } + } public synchronized List<AMQMessage> getMessages() { @@ -197,7 +219,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { AMQMessage message = messages.peek(); - while (message != null && (sub.isBrowser() || message.taken())) + while (message != null && (sub.isBrowser() || message.taken(sub))) { //remove the already taken message messages.poll(); @@ -207,8 +229,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } - public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue) + public void sendNextMessage(Subscription sub, AMQQueue queue) { + + Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages); + + if (messageQueue == null) + { + // There is no queue with messages currently + _log.warn(sub + ": asked to send messages but has none on given queue:" + queue); + return; + } AMQMessage message = null; try { @@ -221,14 +252,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message:" + message + " to :" + sub); + _log.debug("Async Delivery Message:" + message + " to :" + this); } - sub.send(message, _queue); + sub.send(message, queue); //remove sent message from our queue. messageQueue.poll(); - _totalMessageSize.addAndGet(-message.getSize()); + + //If we don't remove the message from _messages + // Otherwise the Async send will never end + if (messageQueue.isEmpty()) + { + if (messageQueue == sub.getResendQueue()) + { + _hasContent.remove(sub); + } + else if (messageQueue == sub.getPreDeliveryQueue()) + { + //fixme + _log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages"); + //_messages.remove(message); + } + } + } catch (FailedDequeueException e) { @@ -254,7 +301,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!sub.isSuspended()) { - sendNextMessage(sub); + sendNextMessage(sub, _queue); hasSubscribers = true; } @@ -262,25 +309,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - private void sendNextMessage(Subscription sub) - { - if (sub.hasFilters()) - { - sendNextMessage(sub, sub.getPreDeliveryQueue()); - if (sub.isAutoClose()) - { - if (sub.getPreDeliveryQueue().isEmpty()) - { - sub.close(); - } - } - } - else - { - sendNextMessage(sub, _messages); - } - } - private AMQMessage poll() { return _messages.poll(); @@ -355,6 +383,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); } + //Mark message as taken + msg.taken(s); //Deliver the message s.send(msg, _queue); } @@ -405,8 +435,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + + _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") hasContent:" + + _hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() + " Processing:" + _processing.get()); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 28386dfa45..e4242f497a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -80,4 +80,6 @@ interface DeliveryManager long getTotalMessageSize(); long getOldestMessageArrival(); + + void setQueueHasContent(Subscription subscription); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index a5672f2b19..30b446c309 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -44,5 +44,11 @@ public interface Subscription void close(); - boolean isBrowser(); + boolean isBrowser(); + + Queue<AMQMessage> getResendQueue(); + + Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages); + + void addToResendQueue(AMQMessage msg); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index 2bb77dc649..ba31ca19b5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -25,18 +25,18 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.FieldTable; /** - * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This - * factory primarily assists testing although in future more sophisticated subscribers may need a different - * subscription implementation. + * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory + * primarily assists testing although in future more sophisticated subscribers may need a different subscription + * implementation. * * @see org.apache.qpid.server.queue.AMQQueue */ public interface SubscriptionFactory { Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, - FieldTable filters, boolean noLocal) throws AMQException; + FieldTable filters, boolean noLocal, DeliveryManager deliveryManager) throws AMQException; - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) - throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, + DeliveryManager deliveryManager) throws AMQException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 78310e8eb3..d43e20eb89 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -39,11 +39,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.Queue; /** - * Encapsulation of a supscription to a queue. - * <p/> - * Ties together the protocol session of a subscriber, the consumer tag that - * was given out by the broker and the channel id. - * <p/> + * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag + * that was given out by the broker and the channel id. <p/> */ public class SubscriptionImpl implements Subscription { @@ -59,40 +56,46 @@ public class SubscriptionImpl implements Subscription private Queue<AMQMessage> _messages; + private Queue<AMQMessage> _resendQueue; + private final boolean _noLocal; - /** - * True if messages need to be acknowledged - */ + /** True if messages need to be acknowledged */ private final boolean _acks; private FilterManager _filters; private final boolean _isBrowser; private final Boolean _autoClose; private boolean _closed = false; + private DeliveryManager _deliveryManager; + public static class Factory implements SubscriptionFactory { - public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, + boolean acks, FieldTable filters, boolean noLocal, + DeliveryManager deliveryManager) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, deliveryManager); } - public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) + public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, + DeliveryManager deliveryManager) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, deliveryManager); } } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks) + String consumerTag, boolean acks, DeliveryManager deliveryManager) throws AMQException { - this(channelId, protocolSession, consumerTag, acks, null, false); + this(channelId, protocolSession, consumerTag, acks, null, false, deliveryManager); } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks, FieldTable filters, boolean noLocal) + String consumerTag, boolean acks, FieldTable filters, boolean noLocal, + DeliveryManager deliveryManager) throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); @@ -107,6 +110,7 @@ public class SubscriptionImpl implements Subscription sessionKey = protocolSession.getKey(); _acks = acks; _noLocal = noLocal; + _deliveryManager = deliveryManager; _filters = FilterManagerFactory.createManager(filters); @@ -165,7 +169,7 @@ public class SubscriptionImpl implements Subscription String consumerTag) throws AMQException { - this(channel, protocolSession, consumerTag, false); + this(channel, protocolSession, consumerTag, false, null); } public boolean equals(Object o) @@ -173,9 +177,7 @@ public class SubscriptionImpl implements Subscription return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o); } - /** - * Equality holds if the session matches and the channel and consumer tag are the same. - */ + /** Equality holds if the session matches and the channel and consumer tag are the same. */ private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) @@ -194,11 +196,12 @@ public class SubscriptionImpl implements Subscription } /** - * This method can be called by each of the publisher threads. - * As a result all changes to the channel object must be thread safe. + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. * * @param msg * @param queue + * * @throws AMQException */ public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException @@ -225,7 +228,7 @@ public class SubscriptionImpl implements Subscription // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -259,7 +262,7 @@ public class SubscriptionImpl implements Subscription { queue.dequeue(msg); } - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -376,6 +379,11 @@ public class SubscriptionImpl implements Subscription public void close() { + if (_resendQueue != null && !_resendQueue.isEmpty()) + { + requeue(); + } + if (!_closed) { _logger.info("Closing autoclose subscription:" + this); @@ -383,18 +391,74 @@ public class SubscriptionImpl implements Subscription // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag // consumerTag - )); + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag // consumerTag + )); _closed = true; } } + private void requeue() + { + //fixme + _logger.error("MESSAGES LOST as subscription hasn't yet resent all its requeued messages"); + } + public boolean isBrowser() { return _isBrowser; } + public Queue<AMQMessage> getResendQueue() + { + if (_resendQueue == null) + { + _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + } + return _resendQueue; + } + + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + if (_resendQueue != null && !_resendQueue.isEmpty()) + { + return _resendQueue; + } + + if (_filters != null) + { + if (isAutoClose()) + { + if (_messages.isEmpty()) + { + close(); + return null; + } + } + return _messages; + } + else // we want the DM queue + { + return messages; + } + } + + public void addToResendQueue(AMQMessage msg) + { + // add to our resend queue + getResendQueue().add(msg); + + // Mark Queue has having content. + if (_deliveryManager == null) + { + _logger.error("Delivery Manager is null won't be able to resend messages"); + } + else + { + _deliveryManager.setQueueHasContent(this); + } + } private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { @@ -402,13 +466,13 @@ public class SubscriptionImpl implements Subscription // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag, // consumerTag - deliveryTag, // deliveryTag - exchange, // exchange - false, // redelivered - routingKey // routingKey - ); + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + exchange, // exchange + false, // redelivered + routingKey // routingKey + ); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? deliverFrame.writePayload(buf); buf.flip(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 755f2f271b..eb29d9d805 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -86,7 +86,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _nextTag = 1; /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final FlowControllingBlockingQueue _queue; + public final FlowControllingBlockingQueue _queue; private Dispatcher _dispatcher; @@ -804,16 +804,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotTransacted(); // throws IllegalStateException if a transacted session // this is set only here, and the before the consumer's onMessage is called it is set to false _inRecovery = true; + + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { + try + { + suspendChannel(true); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); } + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, (byte) 8, (byte) 0, // AMQP version (major, minor) false)); // requeue + + if (!isSuspended) + { + try + { + suspendChannel(false); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + } boolean isInRecovery() @@ -836,8 +864,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.acknowledge(); } - - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 4087db6562..847454e43e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -100,7 +100,7 @@ public class FlowControllingBlockingQueue { _logger.trace("Object added to queue:" + o); } - + if (_listener != null) { synchronized (_listener) @@ -112,5 +112,10 @@ public class FlowControllingBlockingQueue } } } + + public int size() + { + return _count; + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index d12ab01bdc..d80d3ad87d 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -79,12 +79,15 @@ public class RecoverTest extends TestCase // no ack for last three messages so when I call recover I expect to get three messages back consumerSession.recover(); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg2", tm.getText()); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg3", tm.getText()); tm = (TextMessage) consumer.receive(3000); + assertNotNull("Message was null", tm); assertEquals("msg4", tm.getText()); _logger.info("Received redelivery of three messages. Acknowledging last message"); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index ce3ea01a09..c5ac530297 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -269,7 +269,7 @@ public class CommitRollbackTest extends TestCase _session.commit(); assertNotNull("test message was consumed and rolled back, but is gone", result); - assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText()); } @@ -297,4 +297,5 @@ public class CommitRollbackTest extends TestCase assertNull("test message should be null", result); } + } diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index c751e4a011..f5d51b9826 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -134,11 +134,21 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage public boolean isBrowser() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } - public void sendNextMessage(AMQQueue queue) + public Queue<AMQMessage> getResendQueue() { + return null; + } + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + return null; + } + + public void addToResendQueue(AMQMessage msg) + { + //no-op } } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java index 11bae0d9f6..b66ec70d2c 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -36,9 +36,7 @@ import java.util.Map; import junit.framework.TestCase; -/** - * Tests that acknowledgements are handled correctly. - */ +/** Tests that acknowledgements are handled correctly. */ public class AckTest extends TestCase { private static final Logger _log = Logger.getLogger(AckTest.class); @@ -82,7 +80,7 @@ public class AckTest extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0); + BasicPublishBody publishBody = new BasicPublishBody((byte) 8, (byte) 0); publishBody.routingKey = "rk"; publishBody.exchange = "someExchange"; AMQMessage msg = new AMQMessage(_messageStore, publishBody); @@ -104,12 +102,12 @@ public class AckTest extends TestCase } /** - * Tests that the acknowledgements are correctly associated with a channel and - * order is preserved when acks are enabled + * Tests that the acknowledgements are correctly associated with a channel and order is preserved when acks are + * enabled */ public void testAckChannelAssociationTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null); final int msgCount = 10; publishMessages(msgCount, true); @@ -130,13 +128,11 @@ public class AckTest extends TestCase assertTrue(_messageStore.getMessageMap().size() == msgCount); } - /** - * Tests that in no-ack mode no messages are retained - */ + /** Tests that in no-ack mode no messages are retained */ public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false); + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false, null); final int msgCount = 10; publishMessages(msgCount); @@ -145,13 +141,10 @@ public class AckTest extends TestCase assertTrue(_messageStore.getMessageMap().size() == 0); } - /** - * Tests that a single acknowledgement is handled correctly (i.e multiple flag not - * set case) - */ + /** Tests that a single acknowledgement is handled correctly (i.e multiple flag not set case) */ public void testSingleAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null); final int msgCount = 10; publishMessages(msgCount); @@ -175,13 +168,10 @@ public class AckTest extends TestCase } } - /** - * Tests that a single acknowledgement is handled correctly (i.e multiple flag not - * set case) - */ + /** Tests that a single acknowledgement is handled correctly (i.e multiple flag not set case) */ public void testMultiAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null); final int msgCount = 10; publishMessages(msgCount); @@ -201,12 +191,10 @@ public class AckTest extends TestCase } } - /** - * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. - */ + /** Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. */ public void testMultiAckAllReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null); final int msgCount = 10; publishMessages(msgCount); @@ -231,7 +219,7 @@ public class AckTest extends TestCase int lowMark = 5; int highMark = 10; - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null); _channel.setPrefetchLowMarkCount(lowMark); _channel.setPrefetchHighMarkCount(highMark); @@ -282,7 +270,7 @@ public class AckTest extends TestCase public void testPrefetch() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null); _channel.setPrefetchCount(5); assertTrue(_channel.getPrefetchCount() == 5); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index fea3c93280..27f9802fb5 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -103,7 +103,22 @@ public class SubscriptionTestHelper implements Subscription public boolean isBrowser() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; + } + + public Queue<AMQMessage> getResendQueue() + { + return null; + } + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + return null; + } + + public void addToResendQueue(AMQMessage msg) + { + //no-op } public int hashCode() |