summaryrefslogtreecommitdiff
path: root/java/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r--java/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java535
1 files changed, 535 insertions, 0 deletions
diff --git a/java/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
new file mode 100644
index 0000000000..f97ea6bf1e
--- /dev/null
+++ b/java/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
@@ -0,0 +1,535 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.jms.MessageConsumer;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class BasicMessageConsumer extends Closeable implements MessageConsumer
+{
+ private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
+
+ /**
+ * The connection being used by this consumer
+ */
+ private AMQConnection _connection;
+
+ private String _messageSelector;
+
+ private boolean _noLocal;
+
+ private AMQDestination _destination;
+
+ /**
+ * When true indicates that a blocking receive call is in progress
+ */
+ private final AtomicBoolean _receiving = new AtomicBoolean(false);
+ /**
+ * Holds an atomic reference to the listener installed.
+ */
+ private final AtomicReference _messageListener = new AtomicReference();
+
+ /**
+ * The consumer tag allows us to close the consumer by sending a jmsCancel method to the
+ * broker
+ */
+ private String _consumerTag;
+
+ /**
+ * We need to know the channel id when constructing frames
+ */
+ private int _channelId;
+
+ /**
+ * Used in the blocking receive methods to receive a message from
+ * the Session thread. Argument true indicates we want strict FIFO semantics
+ */
+ private final SynchronousQueue _synchronousQueue = new SynchronousQueue(true);
+
+ private MessageFactoryRegistry _messageFactory;
+
+ private AMQSession _session;
+
+ private AMQProtocolHandler _protocolHandler;
+
+ /**
+ * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
+ */
+ private FieldTable _rawSelectorFieldTable;
+
+ /**
+ * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ */
+ private int _prefetchHigh;
+
+ /**
+ * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ */
+ private int _prefetchLow;
+
+ /**
+ * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+ */
+ private boolean _exclusive;
+
+ /**
+ * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes
+ * per consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
+ * implementation.
+ */
+ private int _acknowledgeMode;
+
+ /**
+ * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+ */
+ private int _outstanding;
+
+ /**
+ * Tag of last message delievered, whoch should be acknowledged on commit in
+ * transaction mode.
+ */
+ private long _lastDeliveryTag;
+
+ /**
+ * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode.
+ * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+ */
+ private boolean _dups_ok_acknowledge_send;
+
+ protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
+ boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+ int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
+ {
+ _channelId = channelId;
+ _connection = connection;
+ _messageSelector = messageSelector;
+ _noLocal = noLocal;
+ _destination = destination;
+ _messageFactory = messageFactory;
+ _session = session;
+ _protocolHandler = protocolHandler;
+ _rawSelectorFieldTable = rawSelectorFieldTable;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
+ _exclusive = exclusive;
+ _acknowledgeMode = acknowledgeMode;
+ }
+
+ public AMQDestination getDestination()
+ {
+ return _destination;
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+ return _messageSelector;
+ }
+
+ public MessageListener getMessageListener() throws JMSException
+ {
+ return (MessageListener) _messageListener.get();
+ }
+
+ public int getAcknowledgeMode()
+ {
+ return _acknowledgeMode;
+ }
+
+ private boolean isMessageListenerSet()
+ {
+ return _messageListener.get() != null;
+ }
+
+ public void setMessageListener(MessageListener messageListener) throws JMSException
+ {
+ checkNotClosed();
+
+ //if the current listener is non-null and the session is not stopped, then
+ //it is an error to call this method.
+
+ //i.e. it is only valid to call this method if
+ //
+ // (a) the session is stopped, in which case the dispatcher is not running
+ // OR
+ // (b) the listener is null AND we are not receiving synchronously at present
+ //
+
+ if (_session.isStopped())
+ {
+ _messageListener.set(messageListener);
+ _logger.debug("Message listener set for destination " + _destination);
+ }
+ else
+ {
+ if (_receiving.get())
+ {
+ throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+ }
+ if (!_messageListener.compareAndSet(null, messageListener))
+ {
+ throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
+ }
+ _logger.debug("Message listener set for destination " + _destination);
+
+ if (messageListener != null)
+ {
+ //handle case where connection has already been started, and the dispatcher is blocked
+ //doing a put on the _synchronousQueue
+ Object msg = _synchronousQueue.poll();
+ if (msg != null)
+ {
+ AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg;
+ messageListener.onMessage(jmsMsg);
+ postDeliver(jmsMsg);
+ }
+ }
+ }
+ }
+
+ private void acquireReceiving() throws JMSException
+ {
+ if (!_receiving.compareAndSet(false, true))
+ {
+ throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+ }
+ if (isMessageListenerSet())
+ {
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
+ }
+ }
+
+ private void releaseReceiving()
+ {
+ _receiving.set(false);
+ }
+
+ public FieldTable getRawSelectorFieldTable()
+ {
+ return _rawSelectorFieldTable;
+ }
+
+ public int getPrefetch()
+ {
+ return _prefetchHigh;
+ }
+
+ public int getPrefetchHigh()
+ {
+ return _prefetchHigh;
+ }
+
+ public int getPrefetchLow()
+ {
+ return _prefetchLow;
+ }
+
+ public boolean isNoLocal()
+ {
+ return _noLocal;
+ }
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
+
+ public Message receive() throws JMSException
+ {
+ return receive(0);
+ }
+
+ public Message receive(long l) throws JMSException
+ {
+ checkNotClosed();
+
+ acquireReceiving();
+
+ try
+ {
+ Object o = null;
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ o = _synchronousQueue.take();
+ }
+ final AbstractJMSMessage m = returnMessageOrThrow(o);
+ if (m != null)
+ {
+ postDeliver(m);
+ }
+ return m;
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ finally
+ {
+ releaseReceiving();
+ }
+ }
+
+ public Message receiveNoWait() throws JMSException
+ {
+ checkNotClosed();
+
+ acquireReceiving();
+
+ try
+ {
+ Object o = _synchronousQueue.poll();
+ final AbstractJMSMessage m = returnMessageOrThrow(o);
+ if (m != null)
+ {
+ postDeliver(m);
+ }
+ return m;
+ }
+ finally
+ {
+ releaseReceiving();
+ }
+ }
+
+ /**
+ * We can get back either a Message or an exception from the queue. This method examines the argument and deals
+ * with it by throwing it (if an exception) or returning it (in any other case).
+ *
+ * @param o
+ * @return a message only if o is a Message
+ * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not
+ * a JMSException is created with the linked exception set appropriately
+ */
+ private AbstractJMSMessage returnMessageOrThrow(Object o)
+ throws JMSException
+ {
+ // errors are passed via the queue too since there is no way of interrupting the poll() via the API.
+ if (o instanceof Throwable)
+ {
+ JMSException e = new JMSException("Message consumer forcibly closed due to error: " + o);
+ if (o instanceof Exception)
+ {
+ e.setLinkedException((Exception) o);
+ }
+ throw e;
+ }
+ else
+ {
+ return (AbstractJMSMessage) o;
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ synchronized(_connection.getFailoverMutex())
+ {
+ if (!_closed.getAndSet(true))
+ {
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+ try
+ {
+ _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error closing consumer: " + e, e);
+ throw new JMSException("Error closing consumer: " + e);
+ }
+
+ deregisterConsumer();
+ }
+ }
+ }
+
+ /**
+ * Called when you need to invalidate a consumer. Used for example when failover has occurred and the
+ * client has vetoed automatic resubscription.
+ * The caller must hold the failover mutex.
+ */
+ void markClosed()
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ }
+
+ /**
+ * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
+ * of a message listener or a synchronous receive() caller.
+ *
+ * @param messageFrame the raw unprocessed mesage
+ * @param channelId channel on which this message was sent
+ */
+ void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
+ }
+ try
+ {
+ AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
+ messageFrame.deliverBody.redelivered,
+ messageFrame.contentHeader,
+ messageFrame.bodies);
+
+ _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+
+ preDeliver(jmsMessage);
+
+ if (isMessageListenerSet())
+ {
+ //we do not need a lock around the test above, and the dispatch below as it is invalid
+ //for an application to alter an installed listener while the session is started
+ getMessageListener().onMessage(jmsMessage);
+ postDeliver(jmsMessage);
+ }
+ else
+ {
+ _synchronousQueue.put(jmsMessage);
+ }
+ }
+ catch (Exception e)
+ {
+ if (e instanceof InterruptedException)
+ {
+ _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+ }
+ else
+ {
+ _logger.error("Caught exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+
+ private void preDeliver(AbstractJMSMessage msg)
+ {
+ switch (_acknowledgeMode)
+ {
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
+ }
+ }
+
+ private void postDeliver(AbstractJMSMessage msg)
+ {
+ switch (_acknowledgeMode)
+ {
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+
+ if (_dups_ok_acknowledge_send)
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ }
+ break;
+ case Session.AUTO_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
+ case Session.SESSION_TRANSACTED:
+ _lastDeliveryTag = msg.getDeliveryTag();
+ break;
+ }
+ }
+
+ /**
+ * Acknowledge up to last message delivered (if any). Used when commiting.
+ */
+ void acknowledgeLastDelivered()
+ {
+ if (_lastDeliveryTag > 0)
+ {
+ _session.acknowledgeMessage(_lastDeliveryTag, true);
+ _lastDeliveryTag = -1;
+ }
+ }
+
+ void notifyError(Throwable cause)
+ {
+ _closed.set(true);
+
+ // we have no way of propagating the exception to a message listener - a JMS limitation - so we
+ // deal with the case where we have a synchronous receive() waiting for a message to arrive
+ if (!isMessageListenerSet())
+ {
+ // offer only succeeds if there is a thread waiting for an item from the queue
+ if (_synchronousQueue.offer(cause))
+ {
+ _logger.debug("Passed exception to synchronous queue for propagation to receive()");
+ }
+ }
+ deregisterConsumer();
+ }
+
+ /**
+ * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean
+ * case and in the case of an error occurring.
+ */
+ private void deregisterConsumer()
+ {
+ _session.deregisterConsumer(_consumerTag);
+ }
+
+ public String getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ public void setConsumerTag(String consumerTag)
+ {
+ _consumerTag = consumerTag;
+ }
+}