summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java225
1 files changed, 87 insertions, 138 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 723e502ff0..5203a27f42 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -63,7 +63,6 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -78,7 +77,6 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,20 +97,20 @@ import org.slf4j.LoggerFactory;
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
* after looking at worse bottlenecks first.
*/
-public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
- public static final class IdToConsumerMap
+ public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
- private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+ private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
- public BasicMessageConsumer get(int id)
+ public C get(int id)
{
if ((id & 0xFFFFFFF0) == 0)
{
- return _fastAccessConsumers[id];
+ return (C) _fastAccessConsumers[id];
}
else
{
@@ -120,12 +118,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+ public C put(int id, C consumer)
{
- BasicMessageConsumer oldVal;
+ C oldVal;
if ((id & 0xFFFFFFF0) == 0)
{
- oldVal = _fastAccessConsumers[id];
+ oldVal = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = consumer;
}
else
@@ -137,12 +135,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- public BasicMessageConsumer remove(int id)
+ public C remove(int id)
{
- BasicMessageConsumer consumer;
+ C consumer;
if ((id & 0xFFFFFFF0) == 0)
{
- consumer = _fastAccessConsumers[id];
+ consumer = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = null;
}
else
@@ -154,15 +152,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- public Collection<BasicMessageConsumer> values()
+ public Collection<C> values()
{
- ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+ ArrayList<C> values = new ArrayList<C>();
for (int i = 0; i < 16; i++)
{
if (_fastAccessConsumers[i] != null)
{
- values.add(_fastAccessConsumers[i]);
+ values.add((C) _fastAccessConsumers[i]);
}
}
values.addAll(_slowAccessConsumers.values());
@@ -183,8 +181,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
- /** Used for debugging in the dispatcher. */
- private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class);
/** The default maximum number of prefetched message at which to suspend the channel. */
public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
@@ -234,7 +230,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Holds this session unique identifier, used to distinguish it from other sessions. */
protected int _channelId;
- /** @todo This does not appear to be set? */
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
@@ -261,8 +256,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
+ new ConcurrentHashMap<C, String>();
/**
* Used to hold incoming messages.
@@ -299,7 +294,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- protected final IdToConsumerMap _consumers = new IdToConsumerMap();
+ protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
//Map<AMQShortString, BasicMessageConsumer> _consumers =
//new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
@@ -308,7 +303,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
*/
- private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+ private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -584,7 +579,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
+ public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException
{
if (consumer.getQueuename() != null)
{
@@ -755,11 +750,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public abstract void sendCommit() throws AMQException, FailoverException;
- public void confirmConsumerCancelled(AMQShortString consumerTag)
+
+ public void confirmConsumerCancelled(int consumerTag)
{
// Remove the consumer from the map
- BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
+ C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
if (!consumer.isNoConsume()) // Normal Consumer
@@ -801,7 +797,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
else
{
- _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+ _queue.add(new CloseConsumerMessage(consumer));
}
}
}
@@ -848,7 +844,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false, false);
}
- public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ public C createExclusiveConsumer(Destination destination) throws JMSException
{
checkValidDestination(destination);
@@ -927,8 +923,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_subscriptions.get(name).close();
}
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -960,23 +956,23 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
return msg;
}
- public BasicMessageProducer createProducer(Destination destination) throws JMSException
+ public P createProducer(Destination destination) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
- public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
+ public P createProducer(Destination destination, boolean immediate) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate,
boolean waitUntilSent) throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
@@ -986,7 +982,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic);
+ return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1074,7 +1070,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+ C consumer = (C) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1093,7 +1089,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+ C consumer = (C) createConsumer(destination, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1111,7 +1107,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
+ C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1130,7 +1126,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
+ C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1175,7 +1171,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
}
/**
@@ -1195,7 +1191,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1369,17 +1365,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
_logger.debug("Message[" + message.toString() + "] received in session");
}
-
- if (message instanceof ReturnMessage)
- {
- // Return of the bounced message.
- returnBouncedMessage((ReturnMessage) message);
- }
- else
- {
- _highestDeliveryTag.set(message.getDeliveryTag());
- _queue.add(message);
- }
+ _highestDeliveryTag.set(message.getDeliveryTag());
+ _queue.add(message);
}
public void declareAndBind(AMQDestination amqd)
@@ -1618,7 +1605,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
+ protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
final boolean noConsume, final boolean autoClose) throws JMSException
{
@@ -1642,10 +1629,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
messageSelector = selector;
}
- return new FailoverRetrySupport<MessageConsumer, JMSException>(
- new FailoverProtectedOperation<MessageConsumer, JMSException>()
+ return new FailoverRetrySupport<C, JMSException>(
+ new FailoverProtectedOperation<C, JMSException>()
{
- public MessageConsumer execute() throws JMSException, FailoverException
+ public C execute() throws JMSException, FailoverException
{
checkNotClosed();
@@ -1668,7 +1655,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
}
- BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
if (_messageListener != null)
@@ -1712,7 +1699,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException;
@@ -1722,9 +1709,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @param consumer the consum
*/
- void deregisterConsumer(BasicMessageConsumer consumer)
+ void deregisterConsumer(C consumer)
{
- if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
+ if (_consumers.remove(consumer.getConsumerTag()) != null)
{
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
@@ -2012,12 +1999,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ final Iterator<C> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = it.next();
+ final C con = it.next();
if (error != null)
{
con.notifyError(error);
@@ -2048,7 +2035,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
final Iterator it = clonedProducers.iterator();
while (it.hasNext())
{
- final BasicMessageProducer prod = (BasicMessageProducer) it.next();
+ final P prod = (P) it.next();
prod.close();
}
// at this point the _producers map is empty
@@ -2096,20 +2083,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @param queueName
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
+ private void consumeFromQueue(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
int tagId = _nextTag++;
- // need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(tagId));
- consumer.setConsumerTag(tag);
+ consumer.setConsumerTag(tagId);
// we must register the consumer in the map before we actually start listening
_consumers.put(tagId, consumer);
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag);
+ sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
}
catch (AMQException e)
{
@@ -2119,26 +2104,26 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException;
+ public abstract void sendConsume(C consumer, AMQShortString queueName,
+ AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
- private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+ private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, false);
}
- private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+ private P createProducerImpl(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent) throws JMSException
{
- return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
- new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
+ return new FailoverRetrySupport<P, JMSException>(
+ new FailoverProtectedOperation<P, JMSException>()
{
- public BasicMessageProducer execute() throws JMSException, FailoverException
+ public P execute() throws JMSException, FailoverException
{
checkNotClosed();
long producerId = getNextProducerId();
- BasicMessageProducer producer = createMessageProducer(destination, mandatory,
+ P producer = createMessageProducer(destination, mandatory,
immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
@@ -2147,7 +2132,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent, long producerId);
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
@@ -2320,12 +2305,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ final Iterator<C> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = it.next();
+ final C con = it.next();
con.markClosed();
}
// at this point the _consumers map will be empty
@@ -2360,7 +2345,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @throws AMQException
*/
- private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException
+ private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException
{
AMQDestination amqd = consumer.getDestination();
@@ -2424,15 +2409,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private void rejectAllMessages(boolean requeue)
{
- rejectMessagesForConsumerTag(null, requeue);
+ rejectMessagesForConsumerTag(0, requeue, true);
}
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
+ * @param rejectAllConsumers
*/
- private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
+ private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
{
Iterator messages = _queue.iterator();
if (_logger.isInfoEnabled())
@@ -2453,7 +2439,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString()))
+ if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
{
if (_logger.isDebugEnabled())
{
@@ -2475,12 +2461,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private void resubscribeConsumers() throws AMQException
{
- ArrayList consumers = new ArrayList(_consumers.values());
+ ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
_consumers.clear();
- for (Iterator it = consumers.iterator(); it.hasNext();)
- {
- BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ for (C consumer : consumers)
+ {
consumer.failedOver();
registerConsumer(consumer, true);
}
@@ -2492,53 +2477,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
- BasicMessageProducer producer = (BasicMessageProducer) it.next();
+ P producer = (P) it.next();
producer.resubscribe();
}
}
- private void returnBouncedMessage(final ReturnMessage msg)
- {
- _connection.performConnectionTask(new Runnable()
- {
- public void run()
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
- AMQShortString reason = msg.getReplyText();
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- }
- else
- {
- _connection.exceptionReceived(
- new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
- }
-
- }
- catch (Exception e)
- {
- _logger.error(
- "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
- e);
- }
- }
- });
- }
-
/**
* Suspends or unsuspends this session.
*
@@ -2641,6 +2584,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
+ /** Used for debugging in the dispatcher. */
+ private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
+
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
class Dispatcher extends Thread
{
@@ -2652,6 +2599,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private final AtomicLong _rollbackMark = new AtomicLong(-1);
private String dispatcherID = "" + System.identityHashCode(this);
+
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -2670,7 +2619,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
- public void rejectPending(BasicMessageConsumer consumer)
+ public void rejectPending(C consumer)
{
synchronized (_lock)
{
@@ -2685,7 +2634,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
consumer.rollbackPendingMessages();
// Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
//Let the dispatcher deal with this when it gets to them.
// closeConsumer
@@ -2712,7 +2661,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (C consumer : _consumers.values())
{
if (!consumer.isNoConsume())
{
@@ -2778,7 +2727,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_lock.wait();
}
- if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+ if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
@@ -2840,8 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
//This if block is not needed anymore as bounce messages are handled separately
//if (message.getDeliverBody() != null)
//{
- final BasicMessageConsumer consumer =
- _consumers.get(message.getConsumerTag().toIntValue());
+ final C consumer =
+ _consumers.get(message.getConsumerTag());
if ((consumer == null) || consumer.isClosed())
{