diff options
Diffstat (limited to 'java')
6 files changed, 379 insertions, 124 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index 5f5b7ccad1..5a9b9b54af 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.BasicRecoverOkBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -43,16 +44,24 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRecoverBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - + _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId()); AMQChannel channel = session.getChannel(evt.getChannelId()); BasicRecoverBody body = evt.getMethod(); - + if (channel == null) { throw body.getChannelNotFoundException(evt.getChannelId()); } channel.resend(session, body.requeue); + + if (!body.nowait) + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(BasicRecoverOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 03a70d7f39..413524b6d8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -532,7 +532,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (_started) { - session.start(); + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } return session; } @@ -690,7 +697,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect while (it.hasNext()) { final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue(); - s.start(); + try + { + s.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } _started = true; } @@ -703,7 +717,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { for (Iterator i = _sessions.values().iterator(); i.hasNext();) { - ((AMQSession) i.next()).stop(); + try + { + ((AMQSession) i.next()).stop(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } _started = false; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c1e5c8b555..81f5bbfcc2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -100,6 +100,9 @@ import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -191,6 +194,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _hasMessageListeners; + private boolean _suspended; + + private final Object _suspensionLock = new Object(); + + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread @@ -285,8 +293,50 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //fixme awaitTermination } - } + public void rollback() + { + + synchronized (_lock) + { + boolean isStopped = connectionStopped(); + + if (!isStopped) + { + setConnectionStopped(true); + } + + rejectAllMessages(true); + + _logger.debug("Session Pre Dispatch Queue cleared"); + + for (BasicMessageConsumer consumer : _consumers.values()) + { + consumer.rollback(); + } + + setConnectionStopped(isStopped); + } + + } + + public void rejectPending(AMQShortString consumerTag) + { + synchronized (_lock) + { + boolean stopped = connectionStopped(); + + _dispatcher.setConnectionStopped(false); + + rejectMessagesForConsumerTag(consumerTag, true); + + if (stopped) + { + _dispatcher.setConnectionStopped(stopped); + } + } + } + } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) @@ -328,7 +378,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - suspendChannel(); + new Thread(new SuspenderRunner(true)).start(); } } @@ -337,7 +387,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - unsuspendChannel(); + new Thread(new SuspenderRunner(false)).start(); } } }); @@ -480,16 +530,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rollback() throws JMSException { - checkTransacted(); - try - { - // TODO: Be aware of possible changes to parameter order as versions change. - getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - } - catch (AMQException e) + synchronized (_suspensionLock) { - throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + checkTransacted(); + try + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { +// suspendChannel(true); + } + + _connection.getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + if (!isSuspended) + { +// suspendChannel(false); + } + } + catch (AMQException e) + { + throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + } } } @@ -597,6 +670,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + + public boolean isSuspended() + { + return _suspended; + } + + /** * Called when the server initiates the closure of the session unilaterally. * @@ -737,14 +817,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotTransacted(); // throws IllegalStateException if a transacted session // this is set only here, and the before the consumer's onMessage is called it is set to false _inRecovery = true; - for (BasicMessageConsumer consumer : _consumers.values()) + try { - consumer.clearUnackedMessages(); + + boolean isSuspended = isSuspended(); + +// if (!isSuspended) +// { +// suspendChannel(true); +// } + + for (BasicMessageConsumer consumer : _consumers.values()) + { + consumer.clearUnackedMessages(); + } + + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + false, // nowait + false) // requeue + , BasicRecoverOkBody.class); + +// if (_dispatcher != null) +// { +// _dispatcher.rollback(); +// } +// +// if (!isSuspended) +// { +// suspendChannel(false); +// } + } + catch (AMQException e) + { + throw new JMSAMQException(e); } - // TODO: Be aware of possible changes to parameter order as versions change. - getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false)); // requeue } boolean isInRecovery() @@ -1057,7 +1168,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQInvalidRoutingKeyException e) { - JMSException ide = new InvalidDestinationException("Invalid routing key:"+amqd.getRoutingKey().toString()); + JMSException ide = new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); ide.setLinkedException(e); throw ide; } @@ -1731,7 +1842,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _channelId; } - void start() + void start() throws AMQException { //fixme This should be controlled by _stopped as it pairs with the stop method //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. @@ -1740,7 +1851,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery - unsuspendChannel(); + suspendChannel(false); } if (hasMessageListeners()) @@ -1773,10 +1884,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - void stop() + void stop() throws AMQException { //stop the server delivering messages to this session - suspendChannel(); + suspendChannel(true); if (_dispatcher != null) { @@ -1837,6 +1948,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } + + //ensure we remove the messages from the consumer even if the dispatcher hasn't started + if (_dispatcher == null) + { + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + } } } @@ -1889,35 +2006,54 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private void suspendChannel() + private void suspendChannel(boolean suspend) throws AMQException { - _logger.warn("Suspending channel"); - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false); // active - getProtocolHandler().writeFrame(channelFlowFrame); - } + synchronized (_suspensionLock) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } - private void unsuspendChannel() - { - _logger.warn("Unsuspending channel"); - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - true); // active - getProtocolHandler().writeFrame(channelFlowFrame); + _suspended = suspend; + + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + !suspend); // active + + _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + } } + public void confirmConsumerCancelled(AMQShortString consumerTag) { BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if ((consumer != null) && (consumer.isAutoClose())) + if (consumer != null) { - consumer.closeWhenNoMessages(true); + if (consumer.isAutoClose()) + { + consumer.closeWhenNoMessages(true); + } + else + { + consumer.rollback(); + } } - } + //Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _dispatcher.rejectPending(consumerTag); + } + else + { + rejectMessagesForConsumerTag(consumerTag, true); + } + } /* * I could have combined the last 3 methods, but this way it improves readability @@ -2008,5 +2144,75 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + private class SuspenderRunner implements Runnable + { + private boolean _suspend; + + public SuspenderRunner(boolean suspend) + { + _suspend = suspend; + } + + public void run() + { + try + { + suspendChannel(_suspend); + } + catch (AMQException e) + { + _logger.warn("Unable to suspend channel"); + } + } + } + + + private void rejectAllMessages(boolean requeue) + { + rejectMessagesForConsumerTag(null, requeue); + } + + /** @param consumerTag The consumerTag to prune from queue or all if null + * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + */ + + private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) + { + Iterator messages = _queue.iterator(); + + while (messages.hasNext()) + { + UnprocessedMessage message = (UnprocessedMessage) messages.next(); + + if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Removing message from _queue:" + message); + } + + messages.remove(); + +// rejectMessage(message.getDeliverBody().deliveryTag, requeue); + + _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + } + else + { + _logger.error("Pruned pending message for consumer:" + consumerTag); + } + } + } + + public void rejectMessage(long deliveryTag, boolean requeue) + { + AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + deliveryTag, + requeue); + + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 33fd64f368..496e377435 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -49,9 +49,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); - /** - * The connection being used by this consumer - */ + /** The connection being used by this consumer */ private AMQConnection _connection; private String _messageSelector; @@ -60,33 +58,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private AMQDestination _destination; - /** - * When true indicates that a blocking receive call is in progress - */ + /** When true indicates that a blocking receive call is in progress */ private final AtomicBoolean _receiving = new AtomicBoolean(false); - /** - * Holds an atomic reference to the listener installed. - */ + /** Holds an atomic reference to the listener installed. */ private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); - /** - * The consumer tag allows us to close the consumer by sending a jmsCancel method to the - * broker - */ + /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ private AMQShortString _consumerTag; - /** - * We need to know the channel id when constructing frames - */ + /** We need to know the channel id when constructing frames */ private int _channelId; /** - * Used in the blocking receive methods to receive a message from - * the Session thread. - * <p/> - * Or to notify of errors - * <p/> - * Argument true indicates we want strict FIFO semantics + * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors + * <p/> Argument true indicates we want strict FIFO semantics */ private final ArrayBlockingQueue _synchronousQueue; @@ -96,55 +81,48 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private AMQProtocolHandler _protocolHandler; - /** - * We need to store the "raw" field table so that we can resubscribe in the event of failover being required - */ + /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ private FieldTable _rawSelectorFieldTable; /** - * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover + * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of + * failover */ private int _prefetchHigh; /** - * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover + * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of + * failover */ private int _prefetchLow; - /** - * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover - */ + /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ private boolean _exclusive; /** - * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes - * per consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our + * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per + * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our * implementation. */ private int _acknowledgeMode; - /** - * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode - */ + /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ private int _outstanding; - /** - * Tag of last message delievered, whoch should be acknowledged on commit in - * transaction mode. - */ + /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */ private long _lastDeliveryTag; /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. - * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow + * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding + * number of msgs >= _prefetchHigh and disabled at < _prefetchLow */ private boolean _dups_ok_acknowledge_send; private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); /** - * The thread that was used to call receive(). This is important for being able to interrupt that thread if - * a receive() is in progress. + * The thread that was used to call receive(). This is important for being able to interrupt that thread if a + * receive() is in progress. */ private Thread _receivingThread; @@ -417,13 +395,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** - * We can get back either a Message or an exception from the queue. This method examines the argument and deals - * with it by throwing it (if an exception) or returning it (in any other case). + * We can get back either a Message or an exception from the queue. This method examines the argument and deals with + * it by throwing it (if an exception) or returning it (in any other case). * * @param o + * * @return a message only if o is a Message - * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not - * a JMSException is created with the linked exception set appropriately + * + * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a + * JMSException is created with the linked exception set appropriately */ private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException @@ -488,9 +468,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** - * Called when you need to invalidate a consumer. Used for example when failover has occurred and the - * client has vetoed automatic resubscription. - * The caller must hold the failover mutex. + * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has + * vetoed automatic resubscription. The caller must hold the failover mutex. */ void markClosed() { @@ -499,8 +478,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** - * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case - * of a message listener or a synchronous receive() caller. + * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a + * message listener or a synchronous receive() caller. * * @param messageFrame the raw unprocessed mesage * @param channelId channel on which this message was sent @@ -643,9 +622,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - */ + /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeLastDelivered() { if (_lastDeliveryTag > 0) @@ -676,8 +653,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** - * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean - * case and in the case of an error occurring. + * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in + * the case of an error occurring. */ private void deregisterConsumer() { @@ -728,9 +705,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - /** - * Called on recovery to reset the list of delivery tags - */ + /** Called on recovery to reset the list of delivery tags */ public void clearUnackedMessages() { _unacknowledgedDeliveryTags.clear(); @@ -760,4 +735,42 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + + public void rollback() + { + + if (_synchronousQueue.size() > 0) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); + } + for (Object o : _synchronousQueue) + { + if (o instanceof AbstractJMSMessage) + { +// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejected message" + o); + } + + } + else + { + _logger.error("Queue contained a :" + o.getClass() + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + } + } + + if (_synchronousQueue.size() != 0) + { + _logger.warn("Queue was not empty after rejecting all messages"); + } + + _synchronousQueue.clear(); + } + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 7cadbd409a..e2b101ab79 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -64,7 +64,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener protocolSession.writeFrame(frame); if (errorCode != AMQConstant.REPLY_SUCCESS) { - _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason); + } if (errorCode == AMQConstant.NO_CONSUMERS) { throw new AMQNoConsumersException("Error: " + reason, null); diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 5b7144b524..03e7d399ce 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,23 +20,23 @@ */ package org.apache.qpid.client.util; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.log4j.Logger; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.Iterator; /** - * A blocking queue that emits events above a user specified threshold allowing - * the caller to take action (e.g. flow control) to try to prevent the queue - * growing (much) further. The underlying queue itself is not bounded therefore - * the caller is not obliged to react to the events. - * <p/> - * This implementation is <b>only</b> safe where we have a single thread adding - * items and a single (different) thread removing items. + * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow + * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the + * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single + * thread adding items and a single (different) thread removing items. */ public class FlowControllingBlockingQueue { - /** - * This queue is bounded and is used to store messages before being dispatched to the consumer - */ + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final BlockingQueue _queue = new LinkedBlockingQueue(); private final int _flowControlHighThreshold; @@ -44,12 +44,10 @@ public class FlowControllingBlockingQueue private final ThresholdListener _listener; - /** - * We require a separate count so we can track whether we have reached the - * threshold - */ + /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + public interface ThresholdListener { void aboveThreshold(int currentValue); @@ -74,7 +72,7 @@ public class FlowControllingBlockingQueue Object o = _queue.take(); if (_listener != null) { - synchronized(_listener) + synchronized (_listener) { if (_count-- == _flowControlLowThreshold) { @@ -90,7 +88,7 @@ public class FlowControllingBlockingQueue _queue.add(o); if (_listener != null) { - synchronized(_listener) + synchronized (_listener) { if (++_count == _flowControlHighThreshold) { @@ -99,5 +97,10 @@ public class FlowControllingBlockingQueue } } } + + public Iterator iterator() + { + return _queue.iterator(); + } } |