diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-07 12:12:19 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-07 12:12:19 +0000 |
commit | aff14ea772443111050ca56bb908bf3c6d1c33c6 (patch) | |
tree | 421904e3034f61aab56f8539aa01847bfaeb8162 | |
parent | ed5989c275e801fa49561118b01763d7fe462490 (diff) | |
download | qpid-python-aff14ea772443111050ca56bb908bf3c6d1c33c6.tar.gz |
Qpid-346 & QPID-347 messages remaining taken after closure..
+added release() to Channel.requeue()
QPID-346 message loss after roll back.
+client now flushes local pre-receive queues. To ensure message order is preserved.
Test cases to follow
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@504521 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 258 insertions, 154 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 2fb3a0511f..7fb446a579 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -71,28 +71,20 @@ public class AMQChannel */ private AtomicLong _deliveryTag = new AtomicLong(0); - /** - * A channel has a default queue (the last declared) that is used when no queue name is - * explictily set - */ + /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ private AMQQueue _defaultQueue; - /** - * This tag is unique per subscription to a queue. The server returns this in response to a - * basic.consume request. - */ + /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ private int _consumerTag; /** - * The current message - which may be partial in the sense that not all frames have been received yet - - * which has been received by this channel. As the frames are received the message gets updated and once all - * frames have been received the message can then be routed. + * The current message - which may be partial in the sense that not all frames have been received yet - which has + * been received by this channel. As the frames are received the message gets updated and once all frames have been + * received the message can then be routed. */ private AMQMessage _currentMessage; - /** - * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. - */ + /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */ private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>(); private final MessageStore _messageStore; @@ -282,16 +274,16 @@ public class AMQChannel } /** - * Subscribe to a queue. We register all subscriptions in the channel so that - * if the channel is closed we can clean up all subscriptions, even if the - * client does not explicitly unsubscribe from all queues. + * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean + * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. * * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to * @param session the protocol session of the subscriber * @param noLocal - * @return the consumer tag. This is returned to the subscriber and used in - * subsequent unsubscribe requests + * + * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests + * * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ @@ -331,13 +323,13 @@ public class AMQChannel { if (_transactional) { - synchronized(_txnBuffer) + synchronized (_txnBuffer) { _txnBuffer.rollback();//releases messages } } unsubscribeAllConsumers(session); - requeue(); + requeue(); _txnBuffer.commit(); } @@ -360,7 +352,7 @@ public class AMQChannel */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); _lastDeliveryTag = deliveryTag; @@ -369,14 +361,14 @@ public class AMQChannel } /** - * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. - * May result in delivery to this same channel or to other subscribers. + * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to + * this same channel or to other subscribers. */ public void requeue() throws AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Map<Long, UnacknowledgedMessage> currentList; - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { currentList = _unacknowledgedMessageMap; _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); @@ -388,41 +380,64 @@ public class AMQChannel { unacked.message.setTxnBuffer(null); + unacked.message.release(); + unacked.queue.deliver(unacked.message); } } } - /** - * Called to resend all outstanding unacknowledged messages to this same channel. - */ - public void resend(AMQProtocolSession session) + /** Called to resend all outstanding unacknowledged messages to this same channel. */ + public void resend(AMQProtocolSession session) throws AMQException { //messages go to this channel - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { - for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet()) + Iterator<Map.Entry<Long, UnacknowledgedMessage>> messageSetIterator = + _unacknowledgedMessageMap.entrySet().iterator(); + + while (messageSetIterator.hasNext()) { + Map.Entry<Long, UnacknowledgedMessage> entry = messageSetIterator.next(); + long deliveryTag = entry.getKey(); String consumerTag = entry.getValue().consumerTag; - AMQMessage msg = entry.getValue().message; - msg.setRedelivered(true); - session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); + + if (_consumerTag2QueueMap.containsKey(consumerTag)) + { + AMQMessage msg = entry.getValue().message; + msg.setRedelivered(true); + session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); + } + else + { + UnacknowledgedMessage unacked = entry.getValue(); + + if (unacked.queue != null) + { + unacked.message.setTxnBuffer(null); + + unacked.message.release(); + + unacked.queue.deliver(unacked.message); + } + // delete the requeued message. + messageSetIterator.remove(); + } } } } /** - * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged - * messages to remove the queue reference and also decrement any message reference counts, without - * actually removing the item sine we may get an ack for a delivery tag that was generated from the - * deleted queue. + * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to + * remove the queue reference and also decrement any message reference counts, without actually removing the item + * sine we may get an ack for a delivery tag that was generated from the deleted queue. * * @param queue */ public void queueDeleted(AMQQueue queue) { - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet()) { @@ -451,6 +466,7 @@ public class AMQChannel * @param deliveryTag the last delivery tag * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only * acknowledges the single message specified by the delivery tag + * * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException @@ -473,7 +489,7 @@ public class AMQChannel //update the op to include this ack request if (multiple && deliveryTag == 0) { - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { //if have signalled to ack all, that refers only //to all at this time @@ -493,7 +509,7 @@ public class AMQChannel private void checkAck(long deliveryTag) throws AMQException { - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { if (!_unacknowledgedMessageMap.containsKey(deliveryTag)) { @@ -512,7 +528,7 @@ public class AMQChannel if (multiple) { LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { if (deliveryTag == 0) { @@ -573,7 +589,7 @@ public class AMQChannel else { UnacknowledgedMessage msg; - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { msg = _unacknowledgedMessageMap.remove(deliveryTag); } @@ -616,7 +632,7 @@ public class AMQChannel { boolean suspend; //noinspection SynchronizeOnNonFinalField - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark; } @@ -629,7 +645,7 @@ public class AMQChannel if (isSuspended && !suspended) { - synchronized(_unacknowledgedMessageMapLock) + synchronized (_unacknowledgedMessageMapLock) { // Continue being suspended if we are above the _prefetch_LowWaterMark suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark; @@ -679,7 +695,7 @@ public class AMQChannel public void rollback() throws AMQException { //need to protect rollback and close from each other... - synchronized(_txnBuffer) + synchronized (_txnBuffer) { _txnBuffer.rollback(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 3379b092ae..994ed7944e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -429,7 +429,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (_started) { - session.start(); + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } return session; } @@ -581,7 +588,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect while (it.hasNext()) { final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue(); - s.start(); + try + { + s.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } _started = true; } @@ -594,7 +608,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { for (Iterator i = _sessions.values().iterator(); i.hasNext();) { - ((AMQSession) i.next()).stop(); + try + { + ((AMQSession) i.next()).stop(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } _started = false; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ff7bf4fd26..8e9382a9be 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -73,44 +73,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); /** - * Used to reference durable subscribers so they requests for unsubscribe can be handled - * correctly. Note this only keeps a record of subscriptions which have been created - * in the current instance. It does not remember subscriptions between executions of the - * client + * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only + * keeps a record of subscriptions which have been created in the current instance. It does not remember + * subscriptions between executions of the client */ private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = new ConcurrentHashMap<BasicMessageConsumer, String>(); - /** - * Used in the consume method. We generate the consume tag on the client so that we can use the nowait - * feature. - */ + /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */ private int _nextTag = 1; - /** - * This queue is bounded and is used to store messages before being dispatched to the consumer - */ + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final FlowControllingBlockingQueue _queue; private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; - /** - * Set of all producers created by this session - */ + /** Set of all producers created by this session */ private Map _producers = new ConcurrentHashMap(); - /** - * Maps from consumer tag (String) to JMSMessageConsumer instance - */ + /** Maps from consumer tag (String) to JMSMessageConsumer instance */ private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>(); - /** - * Maps from destination to count of JMSMessageConsumers - */ + /** Maps from destination to count of JMSMessageConsumers */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = new ConcurrentHashMap<Destination, AtomicInteger>(); @@ -127,33 +115,29 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi protected static final boolean DEFAULT_MANDATORY = true; /** - * The counter of the next producer id. This id is generated by the session and used only to allow the - * producer to identify itself to the session when deregistering itself. - * <p/> - * Access to this id does not require to be synchronized since according to the JMS specification only one - * thread of control is allowed to create producers for any given session instance. + * The counter of the next producer id. This id is generated by the session and used only to allow the producer to + * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be + * synchronized since according to the JMS specification only one thread of control is allowed to create producers + * for any given session instance. */ private long _nextProducerId; /** - * Set when recover is called. This is to handle the case where recover() is called by application code - * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + * Set when recover is called. This is to handle the case where recover() is called by application code during + * onMessage() processing. We need to make sure we do not send an auto ack if recover was called. */ private boolean _inRecovery; private boolean _connectionStopped; private boolean _hasMessageListeners; + private boolean _suspended; + private final Object _suspensionLock = new Object(); - - /** - * Responsible for decoding a message fragment and passing it to the appropriate message consumer. - */ + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ + /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ private final AtomicBoolean _closed = new AtomicBoolean(false); private final Object _lock = new Object(); @@ -276,6 +260,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //fixme awaitTermination } + + public void rollback() + { + + synchronized (_lock) + { + boolean isStopped = connectionStopped(); + + if (!isStopped) + { + setConnectionStopped(true); + } + + _queue.clear(); + + for (BasicMessageConsumer consumer : _consumers.values()) + { + consumer.rollback(); + } + + setConnectionStopped(isStopped); + } + + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, @@ -318,7 +326,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - suspendChannel(); + + new Thread(new SuspenderRunner(true)).start(); } } @@ -327,7 +336,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - unsuspendChannel(); + + new Thread(new SuspenderRunner(false)).start(); } } }); @@ -533,18 +543,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rollback() throws JMSException { - checkTransacted(); - try - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class); - } - catch (AMQException e) + synchronized (_suspensionLock) { - throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + checkTransacted(); + try + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { + suspendChannel(true); + } + + _connection.getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 0), TxRollbackOkBody.class); + + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + if (!isSuspended) + { + suspendChannel(false); + } + } + catch (AMQException e) + { + throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + } } } @@ -616,9 +647,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + public boolean isSuspended() + { + return _suspended; + } + + /** - * Called when the server initiates the closure of the session - * unilaterally. + * Called when the server initiates the closure of the session unilaterally. * * @param e the exception that caused this session to be closed. Null causes the */ @@ -644,10 +680,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after - * failover when the client has veoted resubscription. - * <p/> - * The caller of this method must already hold the failover mutex. + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover + * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. */ void markClosed() { @@ -867,7 +901,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a QueueReceiver * * @param destination + * * @return QueueReceiver - a wrapper around our MessageConsumer + * * @throws JMSException */ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException @@ -883,7 +919,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param destination * @param messageSelector + * * @return QueueReceiver - a wrapper around our MessageConsumer + * * @throws JMSException */ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException @@ -1146,7 +1184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param amqd * @param protocolHandler + * * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. + * * @throws AMQException */ private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException @@ -1198,6 +1238,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Register to consume from the queue. * * @param queueName + * * @return the consumer tag generated by the broker */ private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler, @@ -1284,7 +1325,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a QueueReceiver wrapping a MessageConsumer * * @param queue + * * @return QueueReceiver + * * @throws JMSException */ public QueueReceiver createReceiver(Queue queue) throws JMSException @@ -1300,7 +1343,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param queue * @param messageSelector + * * @return QueueReceiver + * * @throws JMSException */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException @@ -1347,7 +1392,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Creates a non-durable subscriber * * @param topic + * * @return TopicSubscriber - a wrapper round our MessageConsumer + * * @throws JMSException */ public TopicSubscriber createSubscriber(Topic topic) throws JMSException @@ -1364,7 +1411,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param topic * @param messageSelector * @param noLocal + * * @return TopicSubscriber - a wrapper round our MessageConsumer + * * @throws JMSException */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException @@ -1434,9 +1483,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - /** - * Note, currently this does not handle reuse of the same name with different topics correctly. - */ + /** Note, currently this does not handle reuse of the same name with different topics correctly. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { @@ -1549,8 +1596,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. - * Puts the message onto the queue read by the dispatcher. + * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto + * the queue read by the dispatcher. * * @param message the message that has been received */ @@ -1565,13 +1612,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from - * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is - * AUTO_ACK or similar. + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from a + * BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is AUTO_ACK or similar. * * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the - * delivery tag + * @param multiple if true will acknowledge all messages up to and including the one specified by the delivery + * tag */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { @@ -1609,7 +1655,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _channelId; } - void start() + void start() throws AMQException { //fixme This should be controlled by _stopped as it pairs with the stop method //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. @@ -1618,7 +1664,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery - unsuspendChannel(); + suspendChannel(false); } if (hasMessageListeners()) @@ -1651,10 +1697,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - void stop() + void stop() throws AMQException { //stop the server delivering messages to this session - suspendChannel(); + suspendChannel(true); if (_dispatcher != null) { @@ -1666,6 +1712,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Callers must hold the failover mutex before calling this method. * * @param consumer + * * @throws AMQException */ void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException @@ -1691,8 +1738,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } /** - * Called by the MessageConsumer when closing, to deregister the consumer from the - * map from consumerTag to consumer instance. + * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer + * instance. * * @param consumer the consum */ @@ -1764,28 +1811,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private void suspendChannel() + private void suspendChannel(boolean suspend) throws AMQException { - _logger.warn("Suspending channel"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) - false); // active - _connection.getProtocolHandler().writeFrame(channelFlowFrame); - } + synchronized (_suspensionLock) + { + _logger.warn("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); - private void unsuspendChannel() - { - _logger.warn("Unsuspending channel"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - (byte) 8, (byte) 0, // AMQP version (major, minor) - true); // active - _connection.getProtocolHandler().writeFrame(channelFlowFrame); + _suspended = suspend; + + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + !suspend); // active + + _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + } } public void confirmConsumerCancelled(String consumerTag) @@ -1829,4 +1871,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + private class SuspenderRunner implements Runnable + { + private boolean _suspend; + + public SuspenderRunner(boolean suspend) + { + _suspend = suspend; + } + + public void run() + { + try + { + suspendChannel(_suspend); + } + catch (AMQException e) + { + _logger.warn("Unable to suspend channel"); + } + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index c5e57d2d1c..058afab605 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -762,4 +762,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + + public void rollback() + { + _synchronousQueue.clear(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 5b7144b524..b7b47048c5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,23 +20,20 @@ */ package org.apache.qpid.client.util; +import org.apache.qpid.AMQException; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** - * A blocking queue that emits events above a user specified threshold allowing - * the caller to take action (e.g. flow control) to try to prevent the queue - * growing (much) further. The underlying queue itself is not bounded therefore - * the caller is not obliged to react to the events. - * <p/> - * This implementation is <b>only</b> safe where we have a single thread adding - * items and a single (different) thread removing items. + * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow + * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the + * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single + * thread adding items and a single (different) thread removing items. */ public class FlowControllingBlockingQueue { - /** - * This queue is bounded and is used to store messages before being dispatched to the consumer - */ + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final BlockingQueue _queue = new LinkedBlockingQueue(); private final int _flowControlHighThreshold; @@ -44,12 +41,14 @@ public class FlowControllingBlockingQueue private final ThresholdListener _listener; - /** - * We require a separate count so we can track whether we have reached the - * threshold - */ + /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + public void clear() + { + _queue.clear(); + } + public interface ThresholdListener { void aboveThreshold(int currentValue); @@ -74,7 +73,7 @@ public class FlowControllingBlockingQueue Object o = _queue.take(); if (_listener != null) { - synchronized(_listener) + synchronized (_listener) { if (_count-- == _flowControlLowThreshold) { @@ -90,7 +89,7 @@ public class FlowControllingBlockingQueue _queue.add(o); if (_listener != null) { - synchronized(_listener) + synchronized (_listener) { if (++_count == _flowControlHighThreshold) { |