summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java268
1 files changed, 116 insertions, 152 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 610e0109b1..efbce6033b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -26,11 +26,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.slf4j.Logger;
@@ -42,6 +38,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -53,13 +50,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
/** The connection being used by this consumer */
- private AMQConnection _connection;
+ private final AMQConnection _connection;
- private String _messageSelector;
+ private final String _messageSelector;
- private boolean _noLocal;
+ private final boolean _noLocal;
- private AMQDestination _destination;
+ private final AMQDestination _destination;
/** When true indicates that a blocking receive call is in progress */
private final AtomicBoolean _receiving = new AtomicBoolean(false);
@@ -70,7 +67,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private AMQShortString _consumerTag;
/** We need to know the channel id when constructing frames */
- private int _channelId;
+ private final int _channelId;
/**
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
@@ -78,45 +75,36 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private final ArrayBlockingQueue _synchronousQueue;
- private MessageFactoryRegistry _messageFactory;
+ private final MessageFactoryRegistry _messageFactory;
private final AMQSession _session;
- private AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
/** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
- private FieldTable _rawSelectorFieldTable;
+ private final FieldTable _rawSelectorFieldTable;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchHigh;
+ private final int _prefetchHigh;
/**
* We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchLow;
+ private final int _prefetchLow;
/** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */
- private boolean _exclusive;
+ private final boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
* consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
* implementation.
*/
- private int _acknowledgeMode;
-
- /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
+ private final int _acknowledgeMode;
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
@@ -133,10 +121,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
* on the queue. This is used for queue browsing.
*/
- private boolean _autoClose;
- private boolean _closeWhenNoMessages;
+ private final boolean _autoClose;
- private boolean _noConsume;
+ private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
@@ -156,7 +143,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
- _acknowledgeMode = acknowledgeMode;
+
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
_autoClose = autoClose;
_noConsume = noConsume;
@@ -166,6 +153,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
}
public AMQDestination getDestination()
@@ -253,10 +244,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
- case Session.DUPS_OK_ACKNOWLEDGE:
- _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag());
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- break;
case Session.CLIENT_ACKNOWLEDGE:
_unacknowledgedDeliveryTags.add(msg.getDeliveryTag());
@@ -269,7 +256,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- _logger.info("Recording tag for commit:" + msg.getDeliveryTag());
_receivedDeliveryTags.add(msg.getDeliveryTag());
}
@@ -284,8 +270,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*
* @return boolean if the acquisition was successful
*
- * @throws JMSException
- * @throws InterruptedException
+ * @throws JMSException if a listener has already been set or another thread is receiving
+ * @throws InterruptedException if interrupted
*/
private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
{
@@ -372,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted acquire: " + e);
if (isClosed())
{
return null;
@@ -383,11 +369,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
- if (closeOnAutoClose())
- {
- return null;
- }
-
Object o = null;
if (l > 0)
{
@@ -400,7 +381,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted poll: " + e);
if (isClosed())
{
return null;
@@ -418,7 +399,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted take: " + e);
if (isClosed())
{
return null;
@@ -440,20 +421,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- private boolean closeOnAutoClose() throws JMSException
- {
- if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
- {
- close(false);
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -482,11 +449,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
- if (closeOnAutoClose())
- {
- return null;
- }
-
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -507,7 +469,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* We can get back either a Message or an exception from the queue. This method examines the argument and deals with
* it by throwing it (if an exception) or returning it (in any other case).
*
- * @param o
+ * @param o the object to return or throw
*
* @return a message only if o is a Message
*
@@ -527,6 +489,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw e;
}
+ else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ return null;
+ }
else
{
return (AbstractJMSMessage) o;
@@ -540,31 +508,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- // synchronized (_closed)
-
if (_logger.isInfoEnabled())
{
_logger.info("Closing consumer:" + debugIdentity());
}
- synchronized (_connection.getFailoverMutex())
+ if (!_closed.getAndSet(true))
{
- if (!_closed.getAndSet(true))
+ if (_logger.isDebugEnabled())
{
- if (_logger.isTraceEnabled())
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ if (_closedStack != null)
{
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- if (_closedStack != null)
- {
- _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
- }
- else
- {
- _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
- }
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
}
+ }
- if (sendClose)
+ if (sendClose)
+ {
+ // The Synchronized block only needs to protect network traffic.
+ synchronized (_connection.getFailoverMutex())
{
BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
@@ -578,7 +545,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
-
}
catch (AMQException e)
{
@@ -589,24 +555,26 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
}
- else
- {
- // //fixme this probably is not right
- // if (!isNoConsume())
- { // done in BasicCancelOK Handler but not sending one so just deregister.
- deregisterConsumer();
- }
+ }
+ else
+ {
+ // //fixme this probably is not right
+ // if (!isNoConsume())
+ { // done in BasicCancelOK Handler but not sending one so just deregister.
+ deregisterConsumer();
}
+ }
- if ((_messageListener != null) && _receiving.get())
+ // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
+ // so we need to let it know it is time to close.
+ if ((_messageListener != null) && _receiving.get())
+ {
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Interrupting thread: " + _receivingThread);
- }
-
- _receivingThread.interrupt();
+ _logger.info("Interrupting thread: " + _receivingThread);
}
+
+ _receivingThread.interrupt();
}
}
}
@@ -621,14 +589,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closed.set(true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " markClosed():"
+ _logger.debug(_consumerTag + " markClosed():"
+ Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
- _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
}
else
{
@@ -645,10 +613,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
- * @param channelId channel on which this message was sent
*/
- void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ void notifyMessage(UnprocessedMessage messageFrame)
{
+ if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+ return;
+ }
+
final boolean debug = _logger.isDebugEnabled();
if (debug)
@@ -658,10 +631,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ final BasicDeliverBody deliverBody = messageFrame.getDeliverBody();
+
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(),
- messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(),
- messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ _messageFactory.createMessage(deliverBody.getDeliveryTag(),
+ deliverBody.getRedelivered(),
+ deliverBody.getExchange(),
+ deliverBody.getRoutingKey(),
+ messageFrame.getContentHeader(),
+ messageFrame.getBodies());
if (debug)
{
@@ -673,11 +651,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
// if (!_closed.get())
{
- jmsMessage.setConsumer(this);
-
preDeliver(jmsMessage);
- notifyMessage(jmsMessage, channelId);
+ notifyMessage(jmsMessage);
}
// else
// {
@@ -700,11 +676,33 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- /**
- * @param jmsMessage this message has already been processed so can't redo preDeliver
- * @param channelId
- */
- public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+ /** @param closeMessage this message signals that we should close the browser */
+ public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+ {
+ if (isMessageListenerSet())
+ {
+ // Currently only possible to get this msg type with a browser.
+ // If we get the message here then we should probably just close this consumer.
+ // Though an AutoClose consumer with message listener is quite odd...
+ // Just log out the fact so we know where we are
+ _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+ }
+ else
+ {
+ try
+ {
+ _synchronousQueue.put(closeMessage);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," +
+ "but we shouldn't have close yet");
+ }
+ }
+ }
+
+ /** @param jmsMessage this message has already been processed so can't redo preDeliver */
+ public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
{
@@ -773,28 +771,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
break;
case Session.DUPS_OK_ACKNOWLEDGE:
- /*( if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
-
- //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur.
- if (_outstanding < _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
-
- if (_dups_ok_acknowledge_send)
- {
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
- _outstanding = 0;
- }
- }
-
- break;
- */
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
if (!_session.isInRecovery())
@@ -845,14 +821,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
// synchronized (_closed)
{
_closed.set(true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " notifyError():"
+ _logger.debug(_consumerTag + " notifyError():"
+ Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
- _logger.trace(_consumerTag + " previously" + _closedStack.toString());
+ _logger.debug(_consumerTag + " previously" + _closedStack.toString());
}
else
{
@@ -948,18 +924,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _noConsume;
}
- public void closeWhenNoMessages(boolean b)
- {
- _closeWhenNoMessages = b;
-
- if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
- {
- _closed.set(true);
- _receivingThread.interrupt();
- }
-
- }
-
public void rollback()
{
clearUnackedMessages();
@@ -982,9 +946,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (tag != null)
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting tag from _receivedDTs:" + tag);
+ _logger.debug("Rejecting tag from _receivedDTs:" + tag);
}
_session.rejectMessage(tag, true);
@@ -1025,9 +989,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_session.rejectMessage(((AbstractJMSMessage) o), true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
+ _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
iterator.remove();