summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java1079
1 files changed, 1079 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
new file mode 100644
index 0000000000..5d32863f2f
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -0,0 +1,1079 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.MessageConsumer;
+import org.apache.qpid.jms.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
+{
+ private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
+
+ /** The connection being used by this consumer */
+ protected final AMQConnection _connection;
+
+ protected final String _messageSelector;
+
+ private final boolean _noLocal;
+
+ protected AMQDestination _destination;
+
+ /**
+ * When true indicates that a blocking receive call is in progress
+ */
+ private final AtomicBoolean _receiving = new AtomicBoolean(false);
+ /**
+ * Holds an atomic reference to the listener installed.
+ */
+ private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
+
+ /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
+ protected int _consumerTag;
+
+ /** We need to know the channel id when constructing frames */
+ protected final int _channelId;
+
+ /**
+ * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
+ * <p/> Argument true indicates we want strict FIFO semantics
+ */
+ protected final BlockingQueue _synchronousQueue;
+
+ protected final MessageFactoryRegistry _messageFactory;
+
+ protected final AMQSession _session;
+
+ protected final AMQProtocolHandler _protocolHandler;
+
+ /**
+ * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
+ */
+ private final FieldTable _arguments;
+
+ /**
+ * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
+ * failover
+ */
+ private final int _prefetchHigh;
+
+ /**
+ * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
+ * failover
+ */
+ private final int _prefetchLow;
+
+ /**
+ * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+ */
+ protected boolean _exclusive;
+
+ /**
+ * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
+ * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
+ * implementation.
+ */
+ protected final int _acknowledgeMode;
+
+ /**
+ * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+ */
+ private int _outstanding;
+
+ /**
+ * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
+ * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+ */
+ private boolean _dups_ok_acknowledge_send;
+
+ /**
+ * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
+ */
+ private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+
+ /** The last tag that was "multiple" acknowledged on this session (if transacted) */
+ private long _lastAcked;
+
+ /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
+ private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+
+ private final Object _commitLock = new Object();
+
+ /**
+ * The thread that was used to call receive(). This is important for being able to interrupt that thread if a
+ * receive() is in progress.
+ */
+ private Thread _receivingThread;
+
+
+ /**
+ * Used to store this consumer queue name
+ * Usefull when more than binding key should be used
+ */
+ private AMQShortString _queuename;
+
+ /**
+ * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+ * on the queue. This is used for queue browsing.
+ */
+ private final boolean _autoClose;
+
+ private final boolean _noConsume;
+ private List<StackTraceElement> _closedStack = null;
+
+
+
+ protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
+ AMQSession session, AMQProtocolHandler protocolHandler,
+ FieldTable arguments, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ {
+ _channelId = channelId;
+ _connection = connection;
+ _messageSelector = messageSelector;
+ _noLocal = noLocal;
+ _destination = destination;
+ _messageFactory = messageFactory;
+ _session = session;
+ _protocolHandler = protocolHandler;
+ _arguments = arguments;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
+ _exclusive = exclusive;
+
+ _synchronousQueue = new LinkedBlockingQueue();
+ _autoClose = autoClose;
+ _noConsume = noConsume;
+
+ // Force queue browsers not to use acknowledge modes.
+ if (_noConsume)
+ {
+ _acknowledgeMode = Session.NO_ACKNOWLEDGE;
+ }
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
+ }
+
+ public AMQDestination getDestination()
+ {
+ return _destination;
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+ checkPreConditions();
+
+ return _messageSelector;
+ }
+
+ public MessageListener getMessageListener() throws JMSException
+ {
+ checkPreConditions();
+
+ return _messageListener.get();
+ }
+
+ public int getAcknowledgeMode()
+ {
+ return _acknowledgeMode;
+ }
+
+ protected boolean isMessageListenerSet()
+ {
+ return _messageListener.get() != null;
+ }
+
+ public void setMessageListener(final MessageListener messageListener) throws JMSException
+ {
+ checkPreConditions();
+
+ // if the current listener is non-null and the session is not stopped, then
+ // it is an error to call this method.
+
+ // i.e. it is only valid to call this method if
+ //
+ // (a) the connection is stopped, in which case the dispatcher is not running
+ // OR
+ // (b) the listener is null AND we are not receiving synchronously at present
+ //
+
+ if (!_session.getAMQConnection().started())
+ {
+ _messageListener.set(messageListener);
+ _session.setHasMessageListeners();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(
+ "Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
+ }
+ }
+ else
+ {
+ if (_receiving.get())
+ {
+ throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+ }
+
+ if (!_messageListener.compareAndSet(null, messageListener))
+ {
+ throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
+ }
+
+ _logger.debug("Message listener set for destination " + _destination);
+
+ if (messageListener != null)
+ {
+ //todo: handle case where connection has already been started, and the dispatcher has alreaded started
+ // putting values on the _synchronousQueue
+
+ synchronized (_session)
+ {
+ _messageListener.set(messageListener);
+ _session.setHasMessageListeners();
+ _session.startDispatcherIfNecessary();
+
+ // If we already have messages on the queue, deliver them to the listener
+ Object o = _synchronousQueue.poll();
+ while (o != null)
+ {
+ notifyMessage((AbstractJMSMessage) o);
+ o = _synchronousQueue.poll();
+ }
+ }
+ }
+ }
+ }
+
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ {
+ if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
+ }
+
+ _session.setInRecovery(false);
+ preDeliver(jmsMsg);
+ }
+
+ /**
+ * @param immediate if true then return immediately if the connection is failing over
+ *
+ * @return boolean if the acquisition was successful
+ *
+ * @throws JMSException if a listener has already been set or another thread is receiving
+ * @throws InterruptedException if interrupted
+ */
+ private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
+ {
+ if (_connection.isFailingOver())
+ {
+ if (immediate)
+ {
+ return false;
+ }
+ else
+ {
+ _connection.blockUntilNotFailingOver();
+ }
+ }
+
+ if (!_receiving.compareAndSet(false, true))
+ {
+ throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+ }
+
+ if (isMessageListenerSet())
+ {
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
+ }
+
+ _receivingThread = Thread.currentThread();
+ return true;
+ }
+
+ private void releaseReceiving()
+ {
+ _receiving.set(false);
+ _receivingThread = null;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+ public int getPrefetch()
+ {
+ return _prefetchHigh;
+ }
+
+ public int getPrefetchHigh()
+ {
+ return _prefetchHigh;
+ }
+
+ public int getPrefetchLow()
+ {
+ return _prefetchLow;
+ }
+
+ public boolean isNoLocal()
+ {
+ return _noLocal;
+ }
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
+
+ public boolean isReceiving()
+ {
+ return _receiving.get();
+ }
+
+ public Message receive() throws JMSException
+ {
+ return receive(0);
+ }
+
+ public Message receive(long l) throws JMSException
+ {
+
+ checkPreConditions();
+
+ try
+ {
+ acquireReceiving(false);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted acquire: " + e);
+ if (isClosed())
+ {
+ return null;
+ }
+ }
+
+ _session.startDispatcherIfNecessary();
+
+ try
+ {
+ Object o = getMessageFromQueue(l);
+ final AbstractJMSMessage m = returnMessageOrThrow(o);
+ if (m != null)
+ {
+ preApplicationProcessing(m);
+ postDeliver(m);
+ }
+ return m;
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+
+ return null;
+ }
+ finally
+ {
+ releaseReceiving();
+ }
+ }
+
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else if (l < 0)
+ {
+ o = _synchronousQueue.poll();
+ }
+ else
+ {
+ o = _synchronousQueue.take();
+ }
+ return o;
+ }
+
+ abstract Message receiveBrowse() throws JMSException;
+
+ public Message receiveNoWait() throws JMSException
+ {
+ checkPreConditions();
+
+ try
+ {
+ if (!acquireReceiving(true))
+ {
+ //If we couldn't acquire the receiving thread then return null.
+ // This will occur if failing over.
+ return null;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ /*
+ * This seems slightly shoddy but should never actually be executed
+ * since we told acquireReceiving to return immediately and it shouldn't
+ * block on anything.
+ */
+
+ return null;
+ }
+
+ _session.startDispatcherIfNecessary();
+
+ try
+ {
+ Object o = getMessageFromQueue(-1);
+ final AbstractJMSMessage m = returnMessageOrThrow(o);
+ if (m != null)
+ {
+ preApplicationProcessing(m);
+ postDeliver(m);
+ }
+
+ return m;
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+
+ return null;
+ }
+ finally
+ {
+ releaseReceiving();
+ }
+ }
+
+ /**
+ * We can get back either a Message or an exception from the queue. This method examines the argument and deals with
+ * it by throwing it (if an exception) or returning it (in any other case).
+ *
+ * @param o the object to return or throw
+ * @return a message only if o is a Message
+ * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
+ * JMSException is created with the linked exception set appropriately
+ */
+ private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException
+ {
+ // errors are passed via the queue too since there is no way of interrupting the poll() via the API.
+ if (o instanceof Throwable)
+ {
+ JMSException e = new JMSException("Message consumer forcibly closed due to error: " + o);
+ e.initCause((Throwable) o);
+ if (o instanceof Exception)
+ {
+ e.setLinkedException((Exception) o);
+ }
+
+ throw e;
+ }
+ else if (o instanceof CloseConsumerMessage)
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ return null;
+ }
+ else
+ {
+ return (AbstractJMSMessage) o;
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ close(true);
+ }
+
+ public void close(boolean sendClose) throws JMSException
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing consumer:" + debugIdentity());
+ }
+
+ if (!_closed.getAndSet(true))
+ {
+ _closing.set(true);
+ if (_logger.isDebugEnabled())
+ {
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ if (_closedStack != null)
+ {
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
+ }
+ }
+
+ if (sendClose)
+ {
+ // The Synchronized block only needs to protect network traffic.
+ synchronized (_connection.getFailoverMutex())
+ {
+ try
+ {
+ // If the session is open or we are in the process
+ // of closing the session then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_session.isClosed() || _session.isClosing())
+ {
+ sendCancel();
+ cleanupQueue();
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Error closing consumer: " + e, e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
+ }
+ }
+ }
+ else
+ {
+ // FIXME: wow this is ugly
+ // //fixme this probably is not right
+ // if (!isNoConsume())
+ { // done in BasicCancelOK Handler but not sending one so just deregister.
+ deregisterConsumer();
+ }
+ }
+
+ // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
+ // so we need to let it know it is time to close.
+ if ((_messageListener != null) && _receiving.get())
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Interrupting thread: " + _receivingThread);
+ }
+
+ _receivingThread.interrupt();
+ }
+ }
+ }
+
+ abstract void sendCancel() throws AMQException, FailoverException;
+
+ abstract void cleanupQueue() throws AMQException, FailoverException;
+
+ /**
+ * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
+ * vetoed automatic resubscription. The caller must hold the failover mutex.
+ */
+ void markClosed()
+ {
+ // synchronized (_closed)
+ {
+ _closed.set(true);
+
+ if (_logger.isDebugEnabled())
+ {
+ if (_closedStack != null)
+ {
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ _logger.debug(_consumerTag + " markClosed():"
+ + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
+ }
+ else
+ {
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
+ }
+ }
+ }
+
+ deregisterConsumer();
+ }
+
+ /**
+ * @param closeMessage
+ * this message signals that we should close the browser
+ */
+ public void notifyCloseMessage(CloseConsumerMessage closeMessage)
+ {
+ if (isMessageListenerSet())
+ {
+ // Currently only possible to get this msg type with a browser.
+ // If we get the message here then we should probably just close
+ // this consumer.
+ // Though an AutoClose consumer with message listener is quite odd..
+ // Just log out the fact so we know where we are
+ _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+ }
+ else
+ {
+ try
+ {
+ _synchronousQueue.put(closeMessage);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,"
+ + "but we shouldn't have close yet");
+ }
+ }
+ }
+
+
+ /**
+ * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a
+ * message listener or a synchronous receive() caller.
+ *
+ * @param messageFrame the raw unprocessed mesage
+ */
+ void notifyMessage(U messageFrame)
+ {
+ if (messageFrame instanceof CloseConsumerMessage)
+ {
+ notifyCloseMessage((CloseConsumerMessage) messageFrame);
+ return;
+ }
+
+
+
+ try
+ {
+ AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+ }
+ notifyMessage(jmsMessage);
+ }
+ catch (Exception e)
+ {
+ if (e instanceof InterruptedException)
+ {
+ _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+ }
+ else
+ {
+ _logger.error("Caught exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
+ throws Exception;
+
+ /** @param jmsMessage this message has already been processed so can't redo preDeliver */
+ public void notifyMessage(AbstractJMSMessage jmsMessage)
+ {
+ try
+ {
+ if (isMessageListenerSet())
+ {
+ preApplicationProcessing(jmsMessage);
+ getMessageListener().onMessage(jmsMessage);
+ postDeliver(jmsMessage);
+ }
+ else
+ {
+ // we should not be allowed to add a message is the
+ // consumer is closed
+ _synchronousQueue.put(jmsMessage);
+ }
+ }
+ catch (Exception e)
+ {
+ if (e instanceof InterruptedException)
+ {
+ _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
+ }
+ else
+ {
+ _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+
+ void preDeliver(AbstractJMSMessage msg)
+ {
+ switch (_acknowledgeMode)
+ {
+
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
+
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
+ case Session.SESSION_TRANSACTED:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
+ }
+
+ break;
+ }
+
+ }
+
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
+ {
+ switch (_acknowledgeMode)
+ {
+
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ _session.markDirty();
+ break;
+
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ case Session.AUTO_ACKNOWLEDGE:
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+
+ break;
+ }
+ }
+
+
+ /**
+ * Acknowledge up to last message delivered (if any). Used when commiting.
+ *
+ * @return the lastDeliveryTag to acknowledge
+ */
+ Long getLastDelivered()
+ {
+ if (!_receivedDeliveryTags.isEmpty())
+ {
+ Long lastDeliveryTag = _receivedDeliveryTags.poll();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ lastDeliveryTag = _receivedDeliveryTags.poll();
+ }
+
+ assert _receivedDeliveryTags.isEmpty();
+
+ return lastDeliveryTag;
+ }
+
+ return null;
+ }
+
+ /**
+ * Acknowledge up to last message delivered (if any). Used when commiting.
+ */
+ void acknowledgeDelivered()
+ {
+ synchronized(_commitLock)
+ {
+ ArrayList<Long> tagsToAck = new ArrayList<Long>();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ tagsToAck.add(_receivedDeliveryTags.poll());
+ }
+
+ Collections.sort(tagsToAck);
+
+ long prevAcked = _lastAcked;
+ long oldAckPoint = -1;
+
+ while(oldAckPoint != prevAcked)
+ {
+ oldAckPoint = prevAcked;
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
+ {
+ tagsToAckIterator.remove();
+ prevAcked++;
+ }
+
+ Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
+ while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
+ {
+ previousAckIterator.remove();
+ prevAcked++;
+ }
+
+ }
+ if(prevAcked != _lastAcked)
+ {
+ _session.acknowledgeMessage(prevAcked, true);
+ _lastAcked = prevAcked;
+ }
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext())
+ {
+ Long tag = tagsToAckIterator.next();
+ _session.acknowledgeMessage(tag, false);
+ _previouslyAcked.add(tag);
+ }
+ }
+ }
+
+
+ void notifyError(Throwable cause)
+ {
+ // synchronized (_closed)
+ {
+ _closed.set(true);
+ if (_logger.isDebugEnabled())
+ {
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ if (_closedStack != null)
+ {
+ _logger.debug(_consumerTag + " notifyError():"
+ + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ _logger.debug(_consumerTag + " previously" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
+ }
+ }
+ }
+ // QPID-293 can "request redelivery of this error through dispatcher"
+
+ // we have no way of propagating the exception to a message listener - a JMS limitation - so we
+ // deal with the case where we have a synchronous receive() waiting for a message to arrive
+ if (!isMessageListenerSet())
+ {
+ // offer only succeeds if there is a thread waiting for an item from the queue
+ if (_synchronousQueue.offer(cause))
+ {
+ _logger.debug("Passed exception to synchronous queue for propagation to receive()");
+ }
+ }
+
+ deregisterConsumer();
+ }
+
+ /**
+ * Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean case and in
+ * the case of an error occurring.
+ */
+ private void deregisterConsumer()
+ {
+ _session.deregisterConsumer(this);
+ }
+
+ public int getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ public void setConsumerTag(int consumerTag)
+ {
+ _consumerTag = consumerTag;
+ }
+
+ public AMQSession getSession()
+ {
+ return _session;
+ }
+
+ private void checkPreConditions() throws JMSException
+ {
+
+ this.checkNotClosed();
+
+ if ((_session == null) || _session.isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public boolean isNoConsume()
+ {
+ return _noConsume || _destination.isBrowseOnly() ;
+ }
+
+ public void rollback()
+ {
+ rollbackPendingMessages();
+ }
+
+ public void rollbackPendingMessages()
+ {
+ if (_synchronousQueue.size() > 0)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting the messages(" + _synchronousQueue
+ .size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag);
+ }
+
+ Iterator iterator = _synchronousQueue.iterator();
+
+ int initialSize = _synchronousQueue.size();
+
+ boolean removed = false;
+ while (iterator.hasNext())
+ {
+
+ Object o = iterator.next();
+ if (o instanceof AbstractJMSMessage)
+ {
+ _session.rejectMessage(((AbstractJMSMessage) o), true);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
+ }
+
+ iterator.remove();
+ removed = true;
+
+ }
+ else
+ {
+ _logger.error("Queue contained a :" + o.getClass()
+ + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ iterator.remove();
+ removed = true;
+ }
+ }
+
+ if (removed && (initialSize == _synchronousQueue.size()))
+ {
+ _logger.error("Queue had content removed but didn't change in size." + initialSize);
+ }
+
+
+ if (_synchronousQueue.size() != 0)
+ {
+ _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
+ rollback();
+ }
+
+ clearReceiveQueue();
+ }
+ }
+
+ public String debugIdentity()
+ {
+ return String.valueOf(_consumerTag) + "[" + System.identityHashCode(this) + "]";
+ }
+
+ public void clearReceiveQueue()
+ {
+ _synchronousQueue.clear();
+ }
+
+
+ public List<Long> drainReceiverQueueAndRetrieveDeliveryTags()
+ {
+ Iterator<AbstractJMSMessage> iterator = _synchronousQueue.iterator();
+ List<Long> tags = new ArrayList<Long>(_synchronousQueue.size());
+
+ while (iterator.hasNext())
+ {
+
+ AbstractJMSMessage msg = iterator.next();
+ tags.add(msg.getDeliveryTag());
+ iterator.remove();
+ }
+ return tags;
+ }
+
+ public AMQShortString getQueuename()
+ {
+ return _queuename;
+ }
+
+ public void setQueuename(AMQShortString queuename)
+ {
+ this._queuename = queuename;
+ }
+
+ public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException
+ {
+ _session.addBindingKey(this,amqd,routingKey);
+ }
+
+ /** to be called when a failover has occured */
+ public void failedOverPre()
+ {
+ clearReceiveQueue();
+ // TGM FIXME: think this should just be removed
+ // clearUnackedMessages();
+ }
+
+ public void failedOverPost() {}
+
+}