summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-07 12:12:19 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-07 12:12:19 +0000
commitaff14ea772443111050ca56bb908bf3c6d1c33c6 (patch)
tree421904e3034f61aab56f8539aa01847bfaeb8162
parented5989c275e801fa49561118b01763d7fe462490 (diff)
downloadqpid-python-aff14ea772443111050ca56bb908bf3c6d1c33c6.tar.gz
Qpid-346 & QPID-347 messages remaining taken after closure..
+added release() to Channel.requeue() QPID-346 message loss after roll back. +client now flushes local pre-receive queues. To ensure message order is preserved. Test cases to follow git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@504521 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java108
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java241
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java31
5 files changed, 258 insertions, 154 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 2fb3a0511f..7fb446a579 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -71,28 +71,20 @@ public class AMQChannel
*/
private AtomicLong _deliveryTag = new AtomicLong(0);
- /**
- * A channel has a default queue (the last declared) that is used when no queue name is
- * explictily set
- */
+ /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
private AMQQueue _defaultQueue;
- /**
- * This tag is unique per subscription to a queue. The server returns this in response to a
- * basic.consume request.
- */
+ /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
private int _consumerTag;
/**
- * The current message - which may be partial in the sense that not all frames have been received yet -
- * which has been received by this channel. As the frames are received the message gets updated and once all
- * frames have been received the message can then be routed.
+ * The current message - which may be partial in the sense that not all frames have been received yet - which has
+ * been received by this channel. As the frames are received the message gets updated and once all frames have been
+ * received the message can then be routed.
*/
private AMQMessage _currentMessage;
- /**
- * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
- */
+ /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>();
private final MessageStore _messageStore;
@@ -282,16 +274,16 @@ public class AMQChannel
}
/**
- * Subscribe to a queue. We register all subscriptions in the channel so that
- * if the channel is closed we can clean up all subscriptions, even if the
- * client does not explicitly unsubscribe from all queues.
+ * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
+ * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
*
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
* @param noLocal
- * @return the consumer tag. This is returned to the subscriber and used in
- * subsequent unsubscribe requests
+ *
+ * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
+ *
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
@@ -331,13 +323,13 @@ public class AMQChannel
{
if (_transactional)
{
- synchronized(_txnBuffer)
+ synchronized (_txnBuffer)
{
_txnBuffer.rollback();//releases messages
}
}
unsubscribeAllConsumers(session);
- requeue();
+ requeue();
_txnBuffer.commit();
}
@@ -360,7 +352,7 @@ public class AMQChannel
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
{
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
_unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
_lastDeliveryTag = deliveryTag;
@@ -369,14 +361,14 @@ public class AMQChannel
}
/**
- * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
- * May result in delivery to this same channel or to other subscribers.
+ * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
+ * this same channel or to other subscribers.
*/
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Map<Long, UnacknowledgedMessage> currentList;
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
currentList = _unacknowledgedMessageMap;
_unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
@@ -388,41 +380,64 @@ public class AMQChannel
{
unacked.message.setTxnBuffer(null);
+ unacked.message.release();
+
unacked.queue.deliver(unacked.message);
}
}
}
- /**
- * Called to resend all outstanding unacknowledged messages to this same channel.
- */
- public void resend(AMQProtocolSession session)
+ /** Called to resend all outstanding unacknowledged messages to this same channel. */
+ public void resend(AMQProtocolSession session) throws AMQException
{
//messages go to this channel
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
- for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
+ Iterator<Map.Entry<Long, UnacknowledgedMessage>> messageSetIterator =
+ _unacknowledgedMessageMap.entrySet().iterator();
+
+ while (messageSetIterator.hasNext())
{
+ Map.Entry<Long, UnacknowledgedMessage> entry = messageSetIterator.next();
+
long deliveryTag = entry.getKey();
String consumerTag = entry.getValue().consumerTag;
- AMQMessage msg = entry.getValue().message;
- msg.setRedelivered(true);
- session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ AMQMessage msg = entry.getValue().message;
+ msg.setRedelivered(true);
+ session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+ }
+ else
+ {
+ UnacknowledgedMessage unacked = entry.getValue();
+
+ if (unacked.queue != null)
+ {
+ unacked.message.setTxnBuffer(null);
+
+ unacked.message.release();
+
+ unacked.queue.deliver(unacked.message);
+ }
+ // delete the requeued message.
+ messageSetIterator.remove();
+ }
}
}
}
/**
- * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
- * messages to remove the queue reference and also decrement any message reference counts, without
- * actually removing the item sine we may get an ack for a delivery tag that was generated from the
- * deleted queue.
+ * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
+ * remove the queue reference and also decrement any message reference counts, without actually removing the item
+ * sine we may get an ack for a delivery tag that was generated from the deleted queue.
*
* @param queue
*/
public void queueDeleted(AMQQueue queue)
{
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
{
@@ -451,6 +466,7 @@ public class AMQChannel
* @param deliveryTag the last delivery tag
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
+ *
* @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
@@ -473,7 +489,7 @@ public class AMQChannel
//update the op to include this ack request
if (multiple && deliveryTag == 0)
{
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
//if have signalled to ack all, that refers only
//to all at this time
@@ -493,7 +509,7 @@ public class AMQChannel
private void checkAck(long deliveryTag) throws AMQException
{
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
{
@@ -512,7 +528,7 @@ public class AMQChannel
if (multiple)
{
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
if (deliveryTag == 0)
{
@@ -573,7 +589,7 @@ public class AMQChannel
else
{
UnacknowledgedMessage msg;
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
msg = _unacknowledgedMessageMap.remove(deliveryTag);
}
@@ -616,7 +632,7 @@ public class AMQChannel
{
boolean suspend;
//noinspection SynchronizeOnNonFinalField
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
}
@@ -629,7 +645,7 @@ public class AMQChannel
if (isSuspended && !suspended)
{
- synchronized(_unacknowledgedMessageMapLock)
+ synchronized (_unacknowledgedMessageMapLock)
{
// Continue being suspended if we are above the _prefetch_LowWaterMark
suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
@@ -679,7 +695,7 @@ public class AMQChannel
public void rollback() throws AMQException
{
//need to protect rollback and close from each other...
- synchronized(_txnBuffer)
+ synchronized (_txnBuffer)
{
_txnBuffer.rollback();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 3379b092ae..994ed7944e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -429,7 +429,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (_started)
{
- session.start();
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
return session;
}
@@ -581,7 +588,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
while (it.hasNext())
{
final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
- s.start();
+ try
+ {
+ s.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
_started = true;
}
@@ -594,7 +608,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
for (Iterator i = _sessions.values().iterator(); i.hasNext();)
{
- ((AMQSession) i.next()).stop();
+ try
+ {
+ ((AMQSession) i.next()).stop();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
_started = false;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ff7bf4fd26..8e9382a9be 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -73,44 +73,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled
- * correctly. Note this only keeps a record of subscriptions which have been created
- * in the current instance. It does not remember subscriptions between executions of the
- * client
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only
+ * keeps a record of subscriptions which have been created in the current instance. It does not remember
+ * subscriptions between executions of the client
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
- /**
- * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
- * feature.
- */
+ /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */
private int _nextTag = 1;
- /**
- * This queue is bounded and is used to store messages before being dispatched to the consumer
- */
+ /** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final FlowControllingBlockingQueue _queue;
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
- /**
- * Set of all producers created by this session
- */
+ /** Set of all producers created by this session */
private Map _producers = new ConcurrentHashMap();
- /**
- * Maps from consumer tag (String) to JMSMessageConsumer instance
- */
+ /** Maps from consumer tag (String) to JMSMessageConsumer instance */
private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>();
- /**
- * Maps from destination to count of JMSMessageConsumers
- */
+ /** Maps from destination to count of JMSMessageConsumers */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
new ConcurrentHashMap<Destination, AtomicInteger>();
@@ -127,33 +115,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
protected static final boolean DEFAULT_MANDATORY = true;
/**
- * The counter of the next producer id. This id is generated by the session and used only to allow the
- * producer to identify itself to the session when deregistering itself.
- * <p/>
- * Access to this id does not require to be synchronized since according to the JMS specification only one
- * thread of control is allowed to create producers for any given session instance.
+ * The counter of the next producer id. This id is generated by the session and used only to allow the producer to
+ * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be
+ * synchronized since according to the JMS specification only one thread of control is allowed to create producers
+ * for any given session instance.
*/
private long _nextProducerId;
/**
- * Set when recover is called. This is to handle the case where recover() is called by application code
- * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+ * Set when recover is called. This is to handle the case where recover() is called by application code during
+ * onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
*/
private boolean _inRecovery;
private boolean _connectionStopped;
private boolean _hasMessageListeners;
+ private boolean _suspended;
+ private final Object _suspensionLock = new Object();
-
- /**
- * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
- */
+ /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
+ /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
@@ -276,6 +260,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//fixme awaitTermination
}
+
+ public void rollback()
+ {
+
+ synchronized (_lock)
+ {
+ boolean isStopped = connectionStopped();
+
+ if (!isStopped)
+ {
+ setConnectionStopped(true);
+ }
+
+ _queue.clear();
+
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.rollback();
+ }
+
+ setConnectionStopped(isStopped);
+ }
+
+ }
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
@@ -318,7 +326,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
- suspendChannel();
+
+ new Thread(new SuspenderRunner(true)).start();
}
}
@@ -327,7 +336,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
- unsuspendChannel();
+
+ new Thread(new SuspenderRunner(false)).start();
}
}
});
@@ -533,18 +543,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rollback() throws JMSException
{
- checkTransacted();
- try
- {
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class);
- }
- catch (AMQException e)
+ synchronized (_suspensionLock)
{
- throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ checkTransacted();
+ try
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+ suspendChannel(true);
+ }
+
+ _connection.getProtocolHandler().syncWrite(
+ TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class);
+
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ if (!isSuspended)
+ {
+ suspendChannel(false);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ }
}
}
@@ -616,9 +647,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ public boolean isSuspended()
+ {
+ return _suspended;
+ }
+
+
/**
- * Called when the server initiates the closure of the session
- * unilaterally.
+ * Called when the server initiates the closure of the session unilaterally.
*
* @param e the exception that caused this session to be closed. Null causes the
*/
@@ -644,10 +680,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
- * failover when the client has veoted resubscription.
- * <p/>
- * The caller of this method must already hold the failover mutex.
+ * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
+ * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
*/
void markClosed()
{
@@ -867,7 +901,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a QueueReceiver
*
* @param destination
+ *
* @return QueueReceiver - a wrapper around our MessageConsumer
+ *
* @throws JMSException
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
@@ -883,7 +919,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param destination
* @param messageSelector
+ *
* @return QueueReceiver - a wrapper around our MessageConsumer
+ *
* @throws JMSException
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
@@ -1146,7 +1184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param amqd
* @param protocolHandler
+ *
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
+ *
* @throws AMQException
*/
private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
@@ -1198,6 +1238,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Register to consume from the queue.
*
* @param queueName
+ *
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
@@ -1284,7 +1325,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a QueueReceiver wrapping a MessageConsumer
*
* @param queue
+ *
* @return QueueReceiver
+ *
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue) throws JMSException
@@ -1300,7 +1343,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param queue
* @param messageSelector
+ *
* @return QueueReceiver
+ *
* @throws JMSException
*/
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
@@ -1347,7 +1392,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Creates a non-durable subscriber
*
* @param topic
+ *
* @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
@@ -1364,7 +1411,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param topic
* @param messageSelector
* @param noLocal
+ *
* @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
* @throws JMSException
*/
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
@@ -1434,9 +1483,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- */
+ /** Note, currently this does not handle reuse of the same name with different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
@@ -1549,8 +1596,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Invoked by the MINA IO thread (indirectly) when a message is received from the transport.
- * Puts the message onto the queue read by the dispatcher.
+ * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
+ * the queue read by the dispatcher.
*
* @param message the message that has been received
*/
@@ -1565,13 +1612,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
- * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
- * AUTO_ACK or similar.
+ * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a
+ * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar.
*
* @param deliveryTag the tag of the last message to be acknowledged
- * @param multiple if true will acknowledge all messages up to and including the one specified by the
- * delivery tag
+ * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery
+ * tag
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
@@ -1609,7 +1655,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return _channelId;
}
- void start()
+ void start() throws AMQException
{
//fixme This should be controlled by _stopped as it pairs with the stop method
//fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
@@ -1618,7 +1664,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
- unsuspendChannel();
+ suspendChannel(false);
}
if (hasMessageListeners())
@@ -1651,10 +1697,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- void stop()
+ void stop() throws AMQException
{
//stop the server delivering messages to this session
- suspendChannel();
+ suspendChannel(true);
if (_dispatcher != null)
{
@@ -1666,6 +1712,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Callers must hold the failover mutex before calling this method.
*
* @param consumer
+ *
* @throws AMQException
*/
void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException
@@ -1691,8 +1738,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
/**
- * Called by the MessageConsumer when closing, to deregister the consumer from the
- * map from consumerTag to consumer instance.
+ * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
+ * instance.
*
* @param consumer the consum
*/
@@ -1764,28 +1811,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- private void suspendChannel()
+ private void suspendChannel(boolean suspend) throws AMQException
{
- _logger.warn("Suspending channel");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- false); // active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
- }
+ synchronized (_suspensionLock)
+ {
+ _logger.warn("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
- private void unsuspendChannel()
- {
- _logger.warn("Unsuspending channel");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- true); // active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ _suspended = suspend;
+
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ !suspend); // active
+
+ _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ }
}
public void confirmConsumerCancelled(String consumerTag)
@@ -1829,4 +1871,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+ private class SuspenderRunner implements Runnable
+ {
+ private boolean _suspend;
+
+ public SuspenderRunner(boolean suspend)
+ {
+ _suspend = suspend;
+ }
+
+ public void run()
+ {
+ try
+ {
+ suspendChannel(_suspend);
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to suspend channel");
+ }
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index c5e57d2d1c..058afab605 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -762,4 +762,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+
+ public void rollback()
+ {
+ _synchronousQueue.clear();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 5b7144b524..b7b47048c5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -20,23 +20,20 @@
*/
package org.apache.qpid.client.util;
+import org.apache.qpid.AMQException;
+
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
- * A blocking queue that emits events above a user specified threshold allowing
- * the caller to take action (e.g. flow control) to try to prevent the queue
- * growing (much) further. The underlying queue itself is not bounded therefore
- * the caller is not obliged to react to the events.
- * <p/>
- * This implementation is <b>only</b> safe where we have a single thread adding
- * items and a single (different) thread removing items.
+ * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
+ * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
+ * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single
+ * thread adding items and a single (different) thread removing items.
*/
public class FlowControllingBlockingQueue
{
- /**
- * This queue is bounded and is used to store messages before being dispatched to the consumer
- */
+ /** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final BlockingQueue _queue = new LinkedBlockingQueue();
private final int _flowControlHighThreshold;
@@ -44,12 +41,14 @@ public class FlowControllingBlockingQueue
private final ThresholdListener _listener;
- /**
- * We require a separate count so we can track whether we have reached the
- * threshold
- */
+ /** We require a separate count so we can track whether we have reached the threshold */
private int _count;
+ public void clear()
+ {
+ _queue.clear();
+ }
+
public interface ThresholdListener
{
void aboveThreshold(int currentValue);
@@ -74,7 +73,7 @@ public class FlowControllingBlockingQueue
Object o = _queue.take();
if (_listener != null)
{
- synchronized(_listener)
+ synchronized (_listener)
{
if (_count-- == _flowControlLowThreshold)
{
@@ -90,7 +89,7 @@ public class FlowControllingBlockingQueue
_queue.add(o);
if (_listener != null)
{
- synchronized(_listener)
+ synchronized (_listener)
{
if (++_count == _flowControlHighThreshold)
{