summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-08 13:38:37 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-08 13:38:37 +0000
commit208e8a556d64a717c9fec9e49354d6e5c69bbaac (patch)
tree26854ab71db1188b5dad8965520f4c35130e25bd
parent3b6459da09f50e039ba79fabd5da19c3b2cab1f5 (diff)
downloadqpid-python-208e8a556d64a717c9fec9e49354d6e5c69bbaac.tar.gz
QPID-
Broker AMQChannel - Resend modified to add messages to Subscription resendQueue. BasicRecoverMethodHandler - Now makes use of the Requeue boolean (needs test case, but is same logic as TxRollback) TxRollbackHandler - Removed protocol Session from AMQChannel.resend() AMQMessage - Changes comments, updated taken() to record the subscription that took the message AMQQueue - Added DeliveryManager to Subscription constructors. ConcurrentSelectorDeliveryManager - updated to get queue from Subscription and to know when the Subscriptions have content that needs Async delivery. DeliveryManager - added update method to allow a subscription to tell DM it has content to send. Subscription - new methods to handle resendQueue SubscriptionFactory - changes to pass in the DeliveryManager SubscriptionImpl - Comment changes, Constructor changes, implmentations of interface Client Recover and TxRollback now perform their broker methods while suspended. RecoverTest - Added addition asserts to prevent NPEs CommitRollbackTest - word change RemoteSubscriptionImpl/SubscriptionTestHelper - Subscription implementation AckTest - Update for new SubscriptionImpl constructor git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@504887 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java61
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java79
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java86
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java130
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java7
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java3
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java3
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java14
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java42
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java17
17 files changed, 357 insertions, 196 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 7fb446a579..a85db9f26b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.Subscription;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
@@ -388,7 +389,7 @@ public class AMQChannel
}
/** Called to resend all outstanding unacknowledged messages to this same channel. */
- public void resend(AMQProtocolSession session) throws AMQException
+ public void resend() throws AMQException
{
//messages go to this channel
synchronized (_unacknowledgedMessageMapLock)
@@ -400,17 +401,35 @@ public class AMQChannel
{
Map.Entry<Long, UnacknowledgedMessage> entry = messageSetIterator.next();
- long deliveryTag = entry.getKey();
+ //long deliveryTag = entry.getKey();
String consumerTag = entry.getValue().consumerTag;
if (_consumerTag2QueueMap.containsKey(consumerTag))
{
AMQMessage msg = entry.getValue().message;
msg.setRedelivered(true);
- session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+ Subscription sub = msg.getDeliveredSubscription();
+
+ if (sub != null)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Requeuing " + msg + " for resend");
+ }
+
+ sub.addToResendQueue(msg);
+ }
+ else
+ {
+ _log.error("DeliveredSubscription not recorded");
+ }
+
+ // Don't write the frame as the DeliveryManager can now deal with it
+ //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
}
else
- {
+ { // The current consumer has gone so we need to requeue
+
UnacknowledgedMessage unacked = entry.getValue();
if (unacked.queue != null)
@@ -426,6 +445,8 @@ public class AMQChannel
}
}
}
+
+ //fixme need to start the async delivery here.
}
/**
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index 85e802d10d..f83d38ad47 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -46,12 +46,22 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
{
- _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
+ _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+
if (channel == null)
{
throw new AMQException("Unknown channel " + evt.getChannelId());
}
- channel.resend(protocolSession);
+
+ if (evt.getMethod().getRequeue())
+ {
+ //fixme need tests to exercise
+ channel.requeue();
+ }
+ else
+ {
+ channel.resend();
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 588dc026d4..1d11f6297b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -48,17 +48,23 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<TxRollbackBody> evt) throws AMQException
{
- try{
+ try
+ {
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+
channel.rollback();
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
- channel.resend(protocolSession);
- }catch(AMQException e){
+ channel.resend();
+ }
+ catch (AMQException e)
+ {
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 8603113c11..8ba3cc5686 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -39,9 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -66,36 +64,29 @@ public class AMQMessage
private long _arrivalTime;
- /**
- * Keeps a track of how many bytes we have received in body frames
- */
+ /** Keeps a track of how many bytes we have received in body frames */
private long _bodyLengthReceived = 0;
- /**
- * The message store in which this message is contained.
- */
+ /** The message store in which this message is contained. */
private transient final MessageStore _store;
/**
- * For non transactional publishes, a message can be stored as
- * soon as it is complete. For transactional messages it doesnt
- * need to be stored until the transaction is committed.
+ * For non transactional publishes, a message can be stored as soon as it is complete. For transactional messages it
+ * doesnt need to be stored until the transaction is committed.
*/
private boolean _storeWhenComplete;
- /**
- * TxnBuffer for transactionally published messages
- */
+ /** TxnBuffer for transactionally published messages */
private TxnBuffer _txnBuffer;
/**
- * Flag to indicate whether message has been delivered to a
- * consumer. Used in implementing return functionality for
+ * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
private AtomicBoolean _taken;
+ private Subscription _takenBySubcription;
public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -282,9 +273,7 @@ public class AMQMessage
return _messageId;
}
- /**
- * Threadsafe. Increment the reference count on the message.
- */
+ /** Threadsafe. Increment the reference count on the message. */
public void incrementReference()
{
_referenceCount.incrementAndGet();
@@ -390,9 +379,8 @@ public class AMQMessage
/**
* Called to enforce the 'immediate' flag.
*
- * @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException
{
@@ -403,9 +391,8 @@ public class AMQMessage
}
/**
- * Called when this message is delivered to a consumer. (used to
- * implement the 'immediate' flag functionality).
- * And by selectors to determin if the message has already been sent
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). And
+ * by selectors to determin if the message has already been sent
*/
public void setDeliveredToConsumer()
{
@@ -457,13 +444,31 @@ public class AMQMessage
return msgdec;
}
- public boolean taken()
+ public boolean taken(Subscription sub)
{
- return _taken.getAndSet(true);
+ if (_taken.getAndSet(true))
+ {
+ if (sub == _takenBySubcription)
+ {
+ return false;
+ }
+ return true;
+ }
+ else
+ {
+ _takenBySubcription = sub;
+ return false;
+ }
}
public void release()
{
+ _takenBySubcription = null;
_taken.set(false);
}
+
+ public Subscription getDeliveredSubscription()
+ {
+ return _takenBySubcription;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index d8bacc8c7d..bcad8e7d14 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -37,8 +37,8 @@ import java.util.List;
import java.util.concurrent.Executor;
/**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
- * that. It is described fully in RFC 006.
+ * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
+ * fully in RFC 006.
*/
public class AMQQueue implements Managable, Comparable
{
@@ -46,62 +46,41 @@ public class AMQQueue implements Managable, Comparable
private final String _name;
- /**
- * null means shared
- */
+ /** null means shared */
private final String _owner;
private final boolean _durable;
- /**
- * If true, this queue is deleted when the last subscriber is removed
- */
+ /** If true, this queue is deleted when the last subscriber is removed */
private final boolean _autoDelete;
- /**
- * Holds subscribers to the queue.
- */
+ /** Holds subscribers to the queue. */
private final SubscriptionSet _subscribers;
private final SubscriptionFactory _subscriptionFactory;
- /**
- * Manages message delivery.
- */
+ /** Manages message delivery. */
private final DeliveryManager _deliveryMgr;
- /**
- * The queue registry with which this queue is registered.
- */
+ /** The queue registry with which this queue is registered. */
private final QueueRegistry _queueRegistry;
- /**
- * Used to track bindings to exchanges so that on deletion they can easily
- * be cancelled.
- */
+ /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
private final ExchangeBindings _bindings = new ExchangeBindings(this);
- /**
- * Executor on which asynchronous delivery will be carriedout where required
- */
+ /** Executor on which asynchronous delivery will be carriedout where required */
private final Executor _asyncDelivery;
private final AMQQueueMBean _managedObject;
- /**
- * max allowed size(KB) of a single message
- */
+ /** max allowed size(KB) of a single message */
private long _maximumMessageSize = 10000;
- /**
- * max allowed number of messages on a queue.
- */
+ /** max allowed number of messages on a queue. */
@Configured(path = "maximumMessageCount", defaultValue = "0")
public int _maximumMessageCount;
- /**
- * max queue depth for the queue
- */
+ /** max queue depth for the queue */
@Configured(path = "maximumQueueDepth", defaultValue = "0")
public long _maximumQueueDepth = 10000000;
@@ -117,9 +96,7 @@ public class AMQQueue implements Managable, Comparable
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap = 30000;
- /**
- * total messages received by the queue since startup.
- */
+ /** total messages received by the queue since startup. */
public long _totalMessagesReceived = 0;
public int compareTo(Object o)
@@ -198,7 +175,7 @@ public class AMQQueue implements Managable, Comparable
_autoDelete = autoDelete;
_queueRegistry = queueRegistry;
_asyncDelivery = asyncDelivery;
-
+
_managedObject = createMBean();
_managedObject.register();
@@ -244,17 +221,13 @@ public class AMQQueue implements Managable, Comparable
return _autoDelete;
}
- /**
- * @return no of messages(undelivered) on the queue.
- */
+ /** @return no of messages(undelivered) on the queue. */
public int getMessageCount()
{
return _deliveryMgr.getQueueMessageCount();
}
- /**
- * @return List of messages(undelivered) on the queue.
- */
+ /** @return List of messages(undelivered) on the queue. */
public List<AMQMessage> getMessagesOnTheQueue()
{
return _deliveryMgr.getMessages();
@@ -263,10 +236,11 @@ public class AMQQueue implements Managable, Comparable
public long getQueueDepth()
{
return _deliveryMgr.getTotalMessageSize();
- }
+ }
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -285,9 +259,7 @@ public class AMQQueue implements Managable, Comparable
return msg;
}
- /**
- * @return MBean object associated with this Queue
- */
+ /** @return MBean object associated with this Queue */
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -344,17 +316,13 @@ public class AMQQueue implements Managable, Comparable
return _deliveryMgr.getOldestMessageArrival();
}
- /**
- * Removes the AMQMessage from the top of the queue.
- */
+ /** Removes the AMQMessage from the top of the queue. */
public void deleteMessageFromTop() throws AMQException
{
_deliveryMgr.removeAMessageFromTop();
}
- /**
- * removes all the messages from the queue.
- */
+ /** removes all the messages from the queue. */
public void clearQueue() throws AMQException
{
_deliveryMgr.clearAllMessages();
@@ -375,7 +343,7 @@ public class AMQQueue implements Managable, Comparable
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, _deliveryMgr);
if (subscription.hasFilters())
{
@@ -396,7 +364,8 @@ public class AMQQueue implements Managable, Comparable
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
ps,
- consumerTag)))
+ consumerTag,
+ _deliveryMgr)))
== null)
{
throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 2b12b8e14c..f66604a5c1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -31,10 +31,17 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
/** Manages delivery of messages on behalf of a queue */
@@ -68,6 +75,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*/
private ReentrantLock _lock = new ReentrantLock();
private AtomicLong _totalMessageSize = new AtomicLong();
+ private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -111,7 +119,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
try
{
- return !_messages.isEmpty();
+ return !_messages.isEmpty() || !_hasContent.isEmpty();
}
finally
{
@@ -146,6 +154,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
}
+ public void setQueueHasContent(Subscription subscription)
+ {
+ _lock.lock();
+ try
+ {
+
+ _log.debug("Queue has content Set");
+ _hasContent.add(subscription);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
public synchronized List<AMQMessage> getMessages()
{
@@ -197,7 +219,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
AMQMessage message = messages.peek();
- while (message != null && (sub.isBrowser() || message.taken()))
+ while (message != null && (sub.isBrowser() || message.taken(sub)))
{
//remove the already taken message
messages.poll();
@@ -207,8 +229,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
- public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)
{
+
+ Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+
+ if (messageQueue == null)
+ {
+ // There is no queue with messages currently
+ _log.warn(sub + ": asked to send messages but has none on given queue:" + queue);
+ return;
+ }
AMQMessage message = null;
try
{
@@ -221,14 +252,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message:" + message + " to :" + sub);
+ _log.debug("Async Delivery Message:" + message + " to :" + this);
}
- sub.send(message, _queue);
+ sub.send(message, queue);
//remove sent message from our queue.
messageQueue.poll();
- _totalMessageSize.addAndGet(-message.getSize());
+
+ //If we don't remove the message from _messages
+ // Otherwise the Async send will never end
+ if (messageQueue.isEmpty())
+ {
+ if (messageQueue == sub.getResendQueue())
+ {
+ _hasContent.remove(sub);
+ }
+ else if (messageQueue == sub.getPreDeliveryQueue())
+ {
+ //fixme
+ _log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages");
+ //_messages.remove(message);
+ }
+ }
+
}
catch (FailedDequeueException e)
{
@@ -254,7 +301,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!sub.isSuspended())
{
- sendNextMessage(sub);
+ sendNextMessage(sub, _queue);
hasSubscribers = true;
}
@@ -262,25 +309,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- private void sendNextMessage(Subscription sub)
- {
- if (sub.hasFilters())
- {
- sendNextMessage(sub, sub.getPreDeliveryQueue());
- if (sub.isAutoClose())
- {
- if (sub.getPreDeliveryQueue().isEmpty())
- {
- sub.close();
- }
- }
- }
- else
- {
- sendNextMessage(sub, _messages);
- }
- }
-
private AMQMessage poll()
{
return _messages.poll();
@@ -355,6 +383,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
System.identityHashCode(s) + ") :" + s);
}
+ //Mark message as taken
+ msg.taken(s);
//Deliver the message
s.send(msg, _queue);
}
@@ -405,8 +435,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") hasContent:"
+ + _hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() +
" Processing:" + _processing.get());
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index 28386dfa45..e4242f497a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -80,4 +80,6 @@ interface DeliveryManager
long getTotalMessageSize();
long getOldestMessageArrival();
+
+ void setQueueHasContent(Subscription subscription);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index a5672f2b19..30b446c309 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -44,5 +44,11 @@ public interface Subscription
void close();
- boolean isBrowser();
+ boolean isBrowser();
+
+ Queue<AMQMessage> getResendQueue();
+
+ Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
+
+ void addToResendQueue(AMQMessage msg);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index 2bb77dc649..ba31ca19b5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -25,18 +25,18 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.FieldTable;
/**
- * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
- * factory primarily assists testing although in future more sophisticated subscribers may need a different
- * subscription implementation.
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
+ * primarily assists testing although in future more sophisticated subscribers may need a different subscription
+ * implementation.
*
* @see org.apache.qpid.server.queue.AMQQueue
*/
public interface SubscriptionFactory
{
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
- FieldTable filters, boolean noLocal) throws AMQException;
+ FieldTable filters, boolean noLocal, DeliveryManager deliveryManager) throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
+ DeliveryManager deliveryManager) throws AMQException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 78310e8eb3..d43e20eb89 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -39,11 +39,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.Queue;
/**
- * Encapsulation of a supscription to a queue.
- * <p/>
- * Ties together the protocol session of a subscriber, the consumer tag that
- * was given out by the broker and the channel id.
- * <p/>
+ * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
*/
public class SubscriptionImpl implements Subscription
{
@@ -59,40 +56,46 @@ public class SubscriptionImpl implements Subscription
private Queue<AMQMessage> _messages;
+ private Queue<AMQMessage> _resendQueue;
+
private final boolean _noLocal;
- /**
- * True if messages need to be acknowledged
- */
+ /** True if messages need to be acknowledged */
private final boolean _acks;
private FilterManager _filters;
private final boolean _isBrowser;
private final Boolean _autoClose;
private boolean _closed = false;
+ private DeliveryManager _deliveryManager;
+
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
+ boolean acks, FieldTable filters, boolean noLocal,
+ DeliveryManager deliveryManager) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, deliveryManager);
}
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
+ DeliveryManager deliveryManager)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, deliveryManager);
}
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks)
+ String consumerTag, boolean acks, DeliveryManager deliveryManager)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null, false);
+ this(channelId, protocolSession, consumerTag, acks, null, false, deliveryManager);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ String consumerTag, boolean acks, FieldTable filters, boolean noLocal,
+ DeliveryManager deliveryManager)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
@@ -107,6 +110,7 @@ public class SubscriptionImpl implements Subscription
sessionKey = protocolSession.getKey();
_acks = acks;
_noLocal = noLocal;
+ _deliveryManager = deliveryManager;
_filters = FilterManagerFactory.createManager(filters);
@@ -165,7 +169,7 @@ public class SubscriptionImpl implements Subscription
String consumerTag)
throws AMQException
{
- this(channel, protocolSession, consumerTag, false);
+ this(channel, protocolSession, consumerTag, false, null);
}
public boolean equals(Object o)
@@ -173,9 +177,7 @@ public class SubscriptionImpl implements Subscription
return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
}
- /**
- * Equality holds if the session matches and the channel and consumer tag are the same.
- */
+ /** Equality holds if the session matches and the channel and consumer tag are the same. */
private boolean equals(SubscriptionImpl psc)
{
return sessionKey.equals(psc.sessionKey)
@@ -194,11 +196,12 @@ public class SubscriptionImpl implements Subscription
}
/**
- * This method can be called by each of the publisher threads.
- * As a result all changes to the channel object must be thread safe.
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
*
* @param msg
* @param queue
+ *
* @throws AMQException
*/
public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
@@ -225,7 +228,7 @@ public class SubscriptionImpl implements Subscription
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -259,7 +262,7 @@ public class SubscriptionImpl implements Subscription
{
queue.dequeue(msg);
}
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -376,6 +379,11 @@ public class SubscriptionImpl implements Subscription
public void close()
{
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ requeue();
+ }
+
if (!_closed)
{
_logger.info("Closing autoclose subscription:" + this);
@@ -383,18 +391,74 @@ public class SubscriptionImpl implements Subscription
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag // consumerTag
- ));
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag // consumerTag
+ ));
_closed = true;
}
}
+ private void requeue()
+ {
+ //fixme
+ _logger.error("MESSAGES LOST as subscription hasn't yet resent all its requeued messages");
+ }
+
public boolean isBrowser()
{
return _isBrowser;
}
+ public Queue<AMQMessage> getResendQueue()
+ {
+ if (_resendQueue == null)
+ {
+ _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ return _resendQueue;
+ }
+
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ return _resendQueue;
+ }
+
+ if (_filters != null)
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ close();
+ return null;
+ }
+ }
+ return _messages;
+ }
+ else // we want the DM queue
+ {
+ return messages;
+ }
+ }
+
+ public void addToResendQueue(AMQMessage msg)
+ {
+ // add to our resend queue
+ getResendQueue().add(msg);
+
+ // Mark Queue has having content.
+ if (_deliveryManager == null)
+ {
+ _logger.error("Delivery Manager is null won't be able to resend messages");
+ }
+ else
+ {
+ _deliveryManager.setQueueHasContent(this);
+ }
+ }
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
@@ -402,13 +466,13 @@ public class SubscriptionImpl implements Subscription
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- exchange, // exchange
- false, // redelivered
- routingKey // routingKey
- );
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ exchange, // exchange
+ false, // redelivered
+ routingKey // routingKey
+ );
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
deliverFrame.writePayload(buf);
buf.flip();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 755f2f271b..eb29d9d805 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -86,7 +86,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _nextTag = 1;
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final FlowControllingBlockingQueue _queue;
+ public final FlowControllingBlockingQueue _queue;
private Dispatcher _dispatcher;
@@ -804,16 +804,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotTransacted(); // throws IllegalStateException if a transacted session
// this is set only here, and the before the consumer's onMessage is called it is set to false
_inRecovery = true;
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.clearUnackedMessages();
}
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
(byte) 8, (byte) 0, // AMQP version (major, minor)
false)); // requeue
+
+ if (!isSuspended)
+ {
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
}
boolean isInRecovery()
@@ -836,8 +864,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
consumer.acknowledge();
}
-
-
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 4087db6562..847454e43e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -100,7 +100,7 @@ public class FlowControllingBlockingQueue
{
_logger.trace("Object added to queue:" + o);
}
-
+
if (_listener != null)
{
synchronized (_listener)
@@ -112,5 +112,10 @@ public class FlowControllingBlockingQueue
}
}
}
+
+ public int size()
+ {
+ return _count;
+ }
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index d12ab01bdc..d80d3ad87d 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -79,12 +79,15 @@ public class RecoverTest extends TestCase
// no ack for last three messages so when I call recover I expect to get three messages back
consumerSession.recover();
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg2", tm.getText());
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg3", tm.getText());
tm = (TextMessage) consumer.receive(3000);
+ assertNotNull("Message was null", tm);
assertEquals("msg4", tm.getText());
_logger.info("Received redelivery of three messages. Acknowledging last message");
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index ce3ea01a09..c5ac530297 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -269,7 +269,7 @@ public class CommitRollbackTest extends TestCase
_session.commit();
assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+ assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText());
}
@@ -297,4 +297,5 @@ public class CommitRollbackTest extends TestCase
assertNull("test message should be null", result);
}
+
}
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index c751e4a011..f5d51b9826 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -134,11 +134,21 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
public boolean isBrowser()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
- public void sendNextMessage(AMQQueue queue)
+ public Queue<AMQMessage> getResendQueue()
{
+ return null;
+ }
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ return null;
+ }
+
+ public void addToResendQueue(AMQMessage msg)
+ {
+ //no-op
}
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 11bae0d9f6..b66ec70d2c 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -36,9 +36,7 @@ import java.util.Map;
import junit.framework.TestCase;
-/**
- * Tests that acknowledgements are handled correctly.
- */
+/** Tests that acknowledgements are handled correctly. */
public class AckTest extends TestCase
{
private static final Logger _log = Logger.getLogger(AckTest.class);
@@ -82,7 +80,7 @@ public class AckTest extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0);
+ BasicPublishBody publishBody = new BasicPublishBody((byte) 8, (byte) 0);
publishBody.routingKey = "rk";
publishBody.exchange = "someExchange";
AMQMessage msg = new AMQMessage(_messageStore, publishBody);
@@ -104,12 +102,12 @@ public class AckTest extends TestCase
}
/**
- * Tests that the acknowledgements are correctly associated with a channel and
- * order is preserved when acks are enabled
+ * Tests that the acknowledgements are correctly associated with a channel and order is preserved when acks are
+ * enabled
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -130,13 +128,11 @@ public class AckTest extends TestCase
assertTrue(_messageStore.getMessageMap().size() == msgCount);
}
- /**
- * Tests that in no-ack mode no messages are retained
- */
+ /** Tests that in no-ack mode no messages are retained */
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false);
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false, null);
final int msgCount = 10;
publishMessages(msgCount);
@@ -145,13 +141,10 @@ public class AckTest extends TestCase
assertTrue(_messageStore.getMessageMap().size() == 0);
}
- /**
- * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
- * set case)
- */
+ /** Tests that a single acknowledgement is handled correctly (i.e multiple flag not set case) */
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
final int msgCount = 10;
publishMessages(msgCount);
@@ -175,13 +168,10 @@ public class AckTest extends TestCase
}
}
- /**
- * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
- * set case)
- */
+ /** Tests that a single acknowledgement is handled correctly (i.e multiple flag not set case) */
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
final int msgCount = 10;
publishMessages(msgCount);
@@ -201,12 +191,10 @@ public class AckTest extends TestCase
}
}
- /**
- * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
- */
+ /** Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. */
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
final int msgCount = 10;
publishMessages(msgCount);
@@ -231,7 +219,7 @@ public class AckTest extends TestCase
int lowMark = 5;
int highMark = 10;
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
_channel.setPrefetchLowMarkCount(lowMark);
_channel.setPrefetchHighMarkCount(highMark);
@@ -282,7 +270,7 @@ public class AckTest extends TestCase
public void testPrefetch() throws AMQException
{
- _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
_channel.setPrefetchCount(5);
assertTrue(_channel.getPrefetchCount() == 5);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index fea3c93280..27f9802fb5 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -103,7 +103,22 @@ public class SubscriptionTestHelper implements Subscription
public boolean isBrowser()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
+ }
+
+ public Queue<AMQMessage> getResendQueue()
+ {
+ return null;
+ }
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ return null;
+ }
+
+ public void addToResendQueue(AMQMessage msg)
+ {
+ //no-op
}
public int hashCode()