diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 268 |
1 files changed, 116 insertions, 152 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 610e0109b1..efbce6033b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -26,11 +26,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.slf4j.Logger; @@ -42,6 +38,7 @@ import javax.jms.MessageListener; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -53,13 +50,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); /** The connection being used by this consumer */ - private AMQConnection _connection; + private final AMQConnection _connection; - private String _messageSelector; + private final String _messageSelector; - private boolean _noLocal; + private final boolean _noLocal; - private AMQDestination _destination; + private final AMQDestination _destination; /** When true indicates that a blocking receive call is in progress */ private final AtomicBoolean _receiving = new AtomicBoolean(false); @@ -70,7 +67,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private AMQShortString _consumerTag; /** We need to know the channel id when constructing frames */ - private int _channelId; + private final int _channelId; /** * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors @@ -78,45 +75,36 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private final ArrayBlockingQueue _synchronousQueue; - private MessageFactoryRegistry _messageFactory; + private final MessageFactoryRegistry _messageFactory; private final AMQSession _session; - private AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ - private FieldTable _rawSelectorFieldTable; + private final FieldTable _rawSelectorFieldTable; /** * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of * failover */ - private int _prefetchHigh; + private final int _prefetchHigh; /** * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of * failover */ - private int _prefetchLow; + private final int _prefetchLow; /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private boolean _exclusive; + private final boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our * implementation. */ - private int _acknowledgeMode; - - /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; + private final int _acknowledgeMode; private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); @@ -133,10 +121,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive * on the queue. This is used for queue browsing. */ - private boolean _autoClose; - private boolean _closeWhenNoMessages; + private final boolean _autoClose; - private boolean _noConsume; + private final boolean _noConsume; private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, @@ -156,7 +143,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - _acknowledgeMode = acknowledgeMode; + _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); _autoClose = autoClose; _noConsume = noConsume; @@ -166,6 +153,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _acknowledgeMode = Session.NO_ACKNOWLEDGE; } + else + { + _acknowledgeMode = acknowledgeMode; + } } public AMQDestination getDestination() @@ -253,10 +244,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.DUPS_OK_ACKNOWLEDGE: - _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag()); - _receivedDeliveryTags.add(msg.getDeliveryTag()); - break; case Session.CLIENT_ACKNOWLEDGE: _unacknowledgedDeliveryTags.add(msg.getDeliveryTag()); @@ -269,7 +256,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - _logger.info("Recording tag for commit:" + msg.getDeliveryTag()); _receivedDeliveryTags.add(msg.getDeliveryTag()); } @@ -284,8 +270,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * * @return boolean if the acquisition was successful * - * @throws JMSException - * @throws InterruptedException + * @throws JMSException if a listener has already been set or another thread is receiving + * @throws InterruptedException if interrupted */ private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException { @@ -372,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted acquire: " + e); if (isClosed()) { return null; @@ -383,11 +369,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { - if (closeOnAutoClose()) - { - return null; - } - Object o = null; if (l > 0) { @@ -400,7 +381,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted poll: " + e); if (isClosed()) { return null; @@ -418,7 +399,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted take: " + e); if (isClosed()) { return null; @@ -440,20 +421,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - private boolean closeOnAutoClose() throws JMSException - { - if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) - { - close(false); - - return true; - } - else - { - return false; - } - } - public Message receiveNoWait() throws JMSException { checkPreConditions(); @@ -482,11 +449,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { - if (closeOnAutoClose()) - { - return null; - } - Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -507,7 +469,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * We can get back either a Message or an exception from the queue. This method examines the argument and deals with * it by throwing it (if an exception) or returning it (in any other case). * - * @param o + * @param o the object to return or throw * * @return a message only if o is a Message * @@ -527,6 +489,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw e; } + else if (o instanceof UnprocessedMessage.CloseConsumerMessage) + { + _closed.set(true); + deregisterConsumer(); + return null; + } else { return (AbstractJMSMessage) o; @@ -540,31 +508,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - // synchronized (_closed) - if (_logger.isInfoEnabled()) { _logger.info("Closing consumer:" + debugIdentity()); } - synchronized (_connection.getFailoverMutex()) + if (!_closed.getAndSet(true)) { - if (!_closed.getAndSet(true)) + if (_logger.isDebugEnabled()) { - if (_logger.isTraceEnabled()) + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + if (_closedStack != null) { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - if (_closedStack != null) - { - _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); - } - else - { - _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1); - } + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); + } + else + { + _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1); } + } - if (sendClose) + if (sendClose) + { + // The Synchronized block only needs to protect network traffic. + synchronized (_connection.getFailoverMutex()) { BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false); @@ -578,7 +545,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("CancelOk'd for consumer:" + debugIdentity()); } - } catch (AMQException e) { @@ -589,24 +555,26 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } } - else - { - // //fixme this probably is not right - // if (!isNoConsume()) - { // done in BasicCancelOK Handler but not sending one so just deregister. - deregisterConsumer(); - } + } + else + { + // //fixme this probably is not right + // if (!isNoConsume()) + { // done in BasicCancelOK Handler but not sending one so just deregister. + deregisterConsumer(); } + } - if ((_messageListener != null) && _receiving.get()) + // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive + // so we need to let it know it is time to close. + if ((_messageListener != null) && _receiving.get()) + { + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Interrupting thread: " + _receivingThread); - } - - _receivingThread.interrupt(); + _logger.info("Interrupting thread: " + _receivingThread); } + + _receivingThread.interrupt(); } } } @@ -621,14 +589,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closed.set(true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " markClosed():" + _logger.debug(_consumerTag + " markClosed():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); - _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); } else { @@ -645,10 +613,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * message listener or a synchronous receive() caller. * * @param messageFrame the raw unprocessed mesage - * @param channelId channel on which this message was sent */ - void notifyMessage(UnprocessedMessage messageFrame, int channelId) + void notifyMessage(UnprocessedMessage messageFrame) { + if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage) + { + notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame); + return; + } + final boolean debug = _logger.isDebugEnabled(); if (debug) @@ -658,10 +631,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + final BasicDeliverBody deliverBody = messageFrame.getDeliverBody(); + AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(), - messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(), - messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(deliverBody.getDeliveryTag(), + deliverBody.getRedelivered(), + deliverBody.getExchange(), + deliverBody.getRoutingKey(), + messageFrame.getContentHeader(), + messageFrame.getBodies()); if (debug) { @@ -673,11 +651,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // if (!_closed.get()) { - jmsMessage.setConsumer(this); - preDeliver(jmsMessage); - notifyMessage(jmsMessage, channelId); + notifyMessage(jmsMessage); } // else // { @@ -700,11 +676,33 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - /** - * @param jmsMessage this message has already been processed so can't redo preDeliver - * @param channelId - */ - public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) + /** @param closeMessage this message signals that we should close the browser */ + public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage) + { + if (isMessageListenerSet()) + { + // Currently only possible to get this msg type with a browser. + // If we get the message here then we should probably just close this consumer. + // Though an AutoClose consumer with message listener is quite odd... + // Just log out the fact so we know where we are + _logger.warn("Using an AutoCloseconsumer with message listener is not supported."); + } + else + { + try + { + _synchronousQueue.put(closeMessage); + } + catch (InterruptedException e) + { + _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," + + "but we shouldn't have close yet"); + } + } + } + + /** @param jmsMessage this message has already been processed so can't redo preDeliver */ + public void notifyMessage(AbstractJMSMessage jmsMessage) { try { @@ -773,28 +771,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer break; case Session.DUPS_OK_ACKNOWLEDGE: - /*( if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } - - //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur. - if (_outstanding < _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } - - if (_dups_ok_acknowledge_send) - { - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); - _outstanding = 0; - } - } - - break; - */ case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) @@ -845,14 +821,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // synchronized (_closed) { _closed.set(true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " notifyError():" + _logger.debug(_consumerTag + " notifyError():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); - _logger.trace(_consumerTag + " previously" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously" + _closedStack.toString()); } else { @@ -948,18 +924,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _noConsume; } - public void closeWhenNoMessages(boolean b) - { - _closeWhenNoMessages = b; - - if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null)) - { - _closed.set(true); - _receivingThread.interrupt(); - } - - } - public void rollback() { clearUnackedMessages(); @@ -982,9 +946,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (tag != null) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting tag from _receivedDTs:" + tag); + _logger.debug("Rejecting tag from _receivedDTs:" + tag); } _session.rejectMessage(tag, true); @@ -1025,9 +989,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _session.rejectMessage(((AbstractJMSMessage) o), true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); + _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } iterator.remove(); |