diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-21 15:47:17 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-21 15:47:17 +0000 |
commit | 5679739e02af18e04fb66ce531356c051373646b (patch) | |
tree | 584df7d32b2918954d3d5c315e8534db7d51292c /java/client/src | |
parent | 033902ad87784d29855cb49f03fbe279fd84100b (diff) | |
download | qpid-python-5679739e02af18e04fb66ce531356c051373646b.tar.gz |
QPID-348 Problems related to prefetching of messages
Client caches are now cleared.
Partially commented out code in AMQSession and BasicMessageConsumer pending broker fixes to ensure channel suspension is respected. Tests fail otherwise. Tests pass just now as they are not correct, JIRA raised for fix (QPID-386).
Spec Changes
Added recover-ok method to recover. But to maintain compatibility added a nowait bit to request the response.
Java Changes
AMQConnection added wrapping of AMQExceptions that can be thrown by the waiting suspend calls.
AMQSession Added clean up code for rollback/recover to clean up Session._queue and BMC._syncQueue
BasicMessageConsumer - added rollback method to clean up _syncQueue
ChannelCloseMethodHandler - reduced logging level from error to debug for received methods.
FlowControllingBlockingQueue - added code to return iterator so messages can be purged cleanly.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510060 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
5 files changed, 368 insertions, 122 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 03a70d7f39..413524b6d8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -532,7 +532,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (_started) { - session.start(); + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } return session; } @@ -690,7 +697,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect while (it.hasNext()) { final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue(); - s.start(); + try + { + s.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } _started = true; } @@ -703,7 +717,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { for (Iterator i = _sessions.values().iterator(); i.hasNext();) { - ((AMQSession) i.next()).stop(); + try + { + ((AMQSession) i.next()).stop(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } } _started = false; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c1e5c8b555..81f5bbfcc2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -100,6 +100,9 @@ import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; import org.apache.qpid.framing.QueueBindOkBody; import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -191,6 +194,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _hasMessageListeners; + private boolean _suspended; + + private final Object _suspensionLock = new Object(); + + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread @@ -285,8 +293,50 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //fixme awaitTermination } - } + public void rollback() + { + + synchronized (_lock) + { + boolean isStopped = connectionStopped(); + + if (!isStopped) + { + setConnectionStopped(true); + } + + rejectAllMessages(true); + + _logger.debug("Session Pre Dispatch Queue cleared"); + + for (BasicMessageConsumer consumer : _consumers.values()) + { + consumer.rollback(); + } + + setConnectionStopped(isStopped); + } + + } + + public void rejectPending(AMQShortString consumerTag) + { + synchronized (_lock) + { + boolean stopped = connectionStopped(); + + _dispatcher.setConnectionStopped(false); + + rejectMessagesForConsumerTag(consumerTag, true); + + if (stopped) + { + _dispatcher.setConnectionStopped(stopped); + } + } + } + } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) @@ -328,7 +378,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - suspendChannel(); + new Thread(new SuspenderRunner(true)).start(); } } @@ -337,7 +387,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - unsuspendChannel(); + new Thread(new SuspenderRunner(false)).start(); } } }); @@ -480,16 +530,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rollback() throws JMSException { - checkTransacted(); - try - { - // TODO: Be aware of possible changes to parameter order as versions change. - getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - } - catch (AMQException e) + synchronized (_suspensionLock) { - throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + checkTransacted(); + try + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { +// suspendChannel(true); + } + + _connection.getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + + if (!isSuspended) + { +// suspendChannel(false); + } + } + catch (AMQException e) + { + throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + } } } @@ -597,6 +670,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + + public boolean isSuspended() + { + return _suspended; + } + + /** * Called when the server initiates the closure of the session unilaterally. * @@ -737,14 +817,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotTransacted(); // throws IllegalStateException if a transacted session // this is set only here, and the before the consumer's onMessage is called it is set to false _inRecovery = true; - for (BasicMessageConsumer consumer : _consumers.values()) + try { - consumer.clearUnackedMessages(); + + boolean isSuspended = isSuspended(); + +// if (!isSuspended) +// { +// suspendChannel(true); +// } + + for (BasicMessageConsumer consumer : _consumers.values()) + { + consumer.clearUnackedMessages(); + } + + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + false, // nowait + false) // requeue + , BasicRecoverOkBody.class); + +// if (_dispatcher != null) +// { +// _dispatcher.rollback(); +// } +// +// if (!isSuspended) +// { +// suspendChannel(false); +// } + } + catch (AMQException e) + { + throw new JMSAMQException(e); } - // TODO: Be aware of possible changes to parameter order as versions change. - getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false)); // requeue } boolean isInRecovery() @@ -1057,7 +1168,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQInvalidRoutingKeyException e) { - JMSException ide = new InvalidDestinationException("Invalid routing key:"+amqd.getRoutingKey().toString()); + JMSException ide = new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); ide.setLinkedException(e); throw ide; } @@ -1731,7 +1842,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return _channelId; } - void start() + void start() throws AMQException { //fixme This should be controlled by _stopped as it pairs with the stop method //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. @@ -1740,7 +1851,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery - unsuspendChannel(); + suspendChannel(false); } if (hasMessageListeners()) @@ -1773,10 +1884,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - void stop() + void stop() throws AMQException { //stop the server delivering messages to this session - suspendChannel(); + suspendChannel(true); if (_dispatcher != null) { @@ -1837,6 +1948,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } + + //ensure we remove the messages from the consumer even if the dispatcher hasn't started + if (_dispatcher == null) + { + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + } } } @@ -1889,35 +2006,54 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private void suspendChannel() + private void suspendChannel(boolean suspend) throws AMQException { - _logger.warn("Suspending channel"); - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false); // active - getProtocolHandler().writeFrame(channelFlowFrame); - } + synchronized (_suspensionLock) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } - private void unsuspendChannel() - { - _logger.warn("Unsuspending channel"); - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - true); // active - getProtocolHandler().writeFrame(channelFlowFrame); + _suspended = suspend; + + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + !suspend); // active + + _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + } } + public void confirmConsumerCancelled(AMQShortString consumerTag) { BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); - if ((consumer != null) && (consumer.isAutoClose())) + if (consumer != null) { - consumer.closeWhenNoMessages(true); + if (consumer.isAutoClose()) + { + consumer.closeWhenNoMessages(true); + } + else + { + consumer.rollback(); + } } - } + //Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _dispatcher.rejectPending(consumerTag); + } + else + { + rejectMessagesForConsumerTag(consumerTag, true); + } + } /* * I could have combined the last 3 methods, but this way it improves readability @@ -2008,5 +2144,75 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } + private class SuspenderRunner implements Runnable + { + private boolean _suspend; + + public SuspenderRunner(boolean suspend) + { + _suspend = suspend; + } + + public void run() + { + try + { + suspendChannel(_suspend); + } + catch (AMQException e) + { + _logger.warn("Unable to suspend channel"); + } + } + } + + + private void rejectAllMessages(boolean requeue) + { + rejectMessagesForConsumerTag(null, requeue); + } + + /** @param consumerTag The consumerTag to prune from queue or all if null + * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + */ + + private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) + { + Iterator messages = _queue.iterator(); + + while (messages.hasNext()) + { + UnprocessedMessage message = (UnprocessedMessage) messages.next(); + + if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Removing message from _queue:" + message); + } + + messages.remove(); + +// rejectMessage(message.getDeliverBody().deliveryTag, requeue); + + _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + } + else + { + _logger.error("Pruned pending message for consumer:" + consumerTag); + } + } + } + + public void rejectMessage(long deliveryTag, boolean requeue) + { + AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + deliveryTag, + requeue); + + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 33fd64f368..496e377435 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -49,9 +49,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); - /** - * The connection being used by this consumer - */ + /** The connection being used by this consumer */ private AMQConnection _connection; private String _messageSelector; @@ -60,33 +58,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private AMQDestination _destination; - /** - * When true indicates that a blocking receive call is in progress - */ + /** When true indicates that a blocking receive call is in progress */ private final AtomicBoolean _receiving = new AtomicBoolean(false); - /** - * Holds an atomic reference to the listener installed. - */ + /** Holds an atomic reference to the listener installed. */ private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); - /** - * The consumer tag allows us to close the consumer by sending a jmsCancel method to the - * broker - */ + /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ private AMQShortString _consumerTag; - /** - * We need to know the channel id when constructing frames - */ + /** We need to know the channel id when constructing frames */ private int _channelId; /** - * Used in the blocking receive methods to receive a message from - * the Session thread. - * <p/> - * Or to notify of errors - * <p/> - * Argument true indicates we want strict FIFO semantics + * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors + * <p/> Argument true indicates we want strict FIFO semantics */ private final ArrayBlockingQueue _synchronousQueue; @@ -96,55 +81,48 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private AMQProtocolHandler _protocolHandler; - /** - * We need to store the "raw" field table so that we can resubscribe in the event of failover being required - */ + /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ private FieldTable _rawSelectorFieldTable; /** - * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover + * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of + * failover */ private int _prefetchHigh; /** - * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover + * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of + * failover */ private int _prefetchLow; - /** - * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover - */ + /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ private boolean _exclusive; /** - * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes - * per consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our + * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per + * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our * implementation. */ private int _acknowledgeMode; - /** - * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode - */ + /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ private int _outstanding; - /** - * Tag of last message delievered, whoch should be acknowledged on commit in - * transaction mode. - */ + /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */ private long _lastDeliveryTag; /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. - * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow + * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding + * number of msgs >= _prefetchHigh and disabled at < _prefetchLow */ private boolean _dups_ok_acknowledge_send; private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); /** - * The thread that was used to call receive(). This is important for being able to interrupt that thread if - * a receive() is in progress. + * The thread that was used to call receive(). This is important for being able to interrupt that thread if a + * receive() is in progress. */ private Thread _receivingThread; @@ -417,13 +395,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** - * We can get back either a Message or an exception from the queue. This method examines the argument and deals - * with it by throwing it (if an exception) or returning it (in any other case). + * We can get back either a Message or an exception from the queue. This method examines the argument and deals with + * it by throwing it (if an exception) or returning it (in any other case). * * @param o + * * @return a message only if o is a Message - * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not - * a JMSException is created with the linked exception set appropriately + * + * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a + * JMSException is created with the linked exception set appropriately */ private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException @@ -488,9 +468,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** - * Called when you need to invalidate a consumer. Used for example when failover has occurred and the - * client has vetoed automatic resubscription. - * The caller must hold the failover mutex. + * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has + * vetoed automatic resubscription. The caller must hold the failover mutex. */ void markClosed() { @@ -499,8 +478,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** - * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case - * of a message listener or a synchronous receive() caller. + * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a + * message listener or a synchronous receive() caller. * * @param messageFrame the raw unprocessed mesage * @param channelId channel on which this message was sent @@ -643,9 +622,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - */ + /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeLastDelivered() { if (_lastDeliveryTag > 0) @@ -676,8 +653,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** - * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean - * case and in the case of an error occurring. + * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in + * the case of an error occurring. */ private void deregisterConsumer() { @@ -728,9 +705,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - /** - * Called on recovery to reset the list of delivery tags - */ + /** Called on recovery to reset the list of delivery tags */ public void clearUnackedMessages() { _unacknowledgedDeliveryTags.clear(); @@ -760,4 +735,42 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + + public void rollback() + { + + if (_synchronousQueue.size() > 0) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); + } + for (Object o : _synchronousQueue) + { + if (o instanceof AbstractJMSMessage) + { +// _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejected message" + o); + } + + } + else + { + _logger.error("Queue contained a :" + o.getClass() + + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + } + } + + if (_synchronousQueue.size() != 0) + { + _logger.warn("Queue was not empty after rejecting all messages"); + } + + _synchronousQueue.clear(); + } + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 7cadbd409a..e2b101ab79 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -64,7 +64,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener protocolSession.writeFrame(frame); if (errorCode != AMQConstant.REPLY_SUCCESS) { - _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason); + } if (errorCode == AMQConstant.NO_CONSUMERS) { throw new AMQNoConsumersException("Error: " + reason, null); diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 5b7144b524..03e7d399ce 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,23 +20,23 @@ */ package org.apache.qpid.client.util; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.log4j.Logger; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.Iterator; /** - * A blocking queue that emits events above a user specified threshold allowing - * the caller to take action (e.g. flow control) to try to prevent the queue - * growing (much) further. The underlying queue itself is not bounded therefore - * the caller is not obliged to react to the events. - * <p/> - * This implementation is <b>only</b> safe where we have a single thread adding - * items and a single (different) thread removing items. + * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow + * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the + * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single + * thread adding items and a single (different) thread removing items. */ public class FlowControllingBlockingQueue { - /** - * This queue is bounded and is used to store messages before being dispatched to the consumer - */ + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final BlockingQueue _queue = new LinkedBlockingQueue(); private final int _flowControlHighThreshold; @@ -44,12 +44,10 @@ public class FlowControllingBlockingQueue private final ThresholdListener _listener; - /** - * We require a separate count so we can track whether we have reached the - * threshold - */ + /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + public interface ThresholdListener { void aboveThreshold(int currentValue); @@ -74,7 +72,7 @@ public class FlowControllingBlockingQueue Object o = _queue.take(); if (_listener != null) { - synchronized(_listener) + synchronized (_listener) { if (_count-- == _flowControlLowThreshold) { @@ -90,7 +88,7 @@ public class FlowControllingBlockingQueue _queue.add(o); if (_listener != null) { - synchronized(_listener) + synchronized (_listener) { if (++_count == _flowControlHighThreshold) { @@ -99,5 +97,10 @@ public class FlowControllingBlockingQueue } } } + + public Iterator iterator() + { + return _queue.iterator(); + } } |