diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 225 |
1 files changed, 87 insertions, 138 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 723e502ff0..5203a27f42 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -63,7 +63,6 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -78,7 +77,6 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,20 +97,20 @@ import org.slf4j.LoggerFactory; * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, * after looking at worse bottlenecks first. */ -public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession +public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>(); + private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); - public BasicMessageConsumer get(int id) + public C get(int id) { if ((id & 0xFFFFFFF0) == 0) { - return _fastAccessConsumers[id]; + return (C) _fastAccessConsumers[id]; } else { @@ -120,12 +118,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) + public C put(int id, C consumer) { - BasicMessageConsumer oldVal; + C oldVal; if ((id & 0xFFFFFFF0) == 0) { - oldVal = _fastAccessConsumers[id]; + oldVal = (C) _fastAccessConsumers[id]; _fastAccessConsumers[id] = consumer; } else @@ -137,12 +135,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public BasicMessageConsumer remove(int id) + public C remove(int id) { - BasicMessageConsumer consumer; + C consumer; if ((id & 0xFFFFFFF0) == 0) { - consumer = _fastAccessConsumers[id]; + consumer = (C) _fastAccessConsumers[id]; _fastAccessConsumers[id] = null; } else @@ -154,15 +152,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public Collection<BasicMessageConsumer> values() + public Collection<C> values() { - ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>(); + ArrayList<C> values = new ArrayList<C>(); for (int i = 0; i < 16; i++) { if (_fastAccessConsumers[i] != null) { - values.add(_fastAccessConsumers[i]); + values.add((C) _fastAccessConsumers[i]); } } values.addAll(_slowAccessConsumers.values()); @@ -183,8 +181,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - /** Used for debugging in the dispatcher. */ - private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class); /** The default maximum number of prefetched message at which to suspend the channel. */ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; @@ -234,7 +230,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Holds this session unique identifier, used to distinguish it from other sessions. */ protected int _channelId; - /** @todo This does not appear to be set? */ private int _ticket; /** Holds the high mark for prefetched message, at which the session is suspended. */ @@ -261,8 +256,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = + new ConcurrentHashMap<C, String>(); /** * Used to hold incoming messages. @@ -299,7 +294,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right * consumer. */ - protected final IdToConsumerMap _consumers = new IdToConsumerMap(); + protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); //Map<AMQShortString, BasicMessageConsumer> _consumers = //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); @@ -308,7 +303,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes */ - private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>(); + private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = @@ -584,7 +579,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException + public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException { if (consumer.getQueuename() != null) { @@ -755,11 +750,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void sendCommit() throws AMQException, FailoverException; - public void confirmConsumerCancelled(AMQShortString consumerTag) + + public void confirmConsumerCancelled(int consumerTag) { // Remove the consumer from the map - BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue()); + C consumer = _consumers.get(consumerTag); if (consumer != null) { if (!consumer.isNoConsume()) // Normal Consumer @@ -801,7 +797,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } else { - _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer)); + _queue.add(new CloseConsumerMessage(consumer)); } } } @@ -848,7 +844,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } - public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + public C createExclusiveConsumer(Destination destination) throws JMSException { checkValidDestination(destination); @@ -927,8 +923,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _subscriptions.get(name).close(); } AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + C consumer = (C) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer); _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); @@ -960,23 +956,23 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return msg; } - public BasicMessageProducer createProducer(Destination destination) throws JMSException + public P createProducer(Destination destination) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } - public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException + public P createProducer(Destination destination, boolean immediate) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + public P createProducer(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, + public P createProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); @@ -986,7 +982,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic); + return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1074,7 +1070,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + C consumer = (C) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); } @@ -1093,7 +1089,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); + C consumer = (C) createConsumer(destination, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1111,7 +1107,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); + C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); } @@ -1130,7 +1126,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); + C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1175,7 +1171,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); } /** @@ -1195,7 +1191,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); } public abstract TemporaryQueue createTemporaryQueue() throws JMSException; @@ -1369,17 +1365,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { _logger.debug("Message[" + message.toString() + "] received in session"); } - - if (message instanceof ReturnMessage) - { - // Return of the bounced message. - returnBouncedMessage((ReturnMessage) message); - } - else - { - _highestDeliveryTag.set(message.getDeliveryTag()); - _queue.add(message); - } + _highestDeliveryTag.set(message.getDeliveryTag()); + _queue.add(message); } public void declareAndBind(AMQDestination amqd) @@ -1618,7 +1605,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, + protected C createConsumerImpl(final Destination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { @@ -1642,10 +1629,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector = selector; } - return new FailoverRetrySupport<MessageConsumer, JMSException>( - new FailoverProtectedOperation<MessageConsumer, JMSException>() + return new FailoverRetrySupport<C, JMSException>( + new FailoverProtectedOperation<C, JMSException>() { - public MessageConsumer execute() throws JMSException, FailoverException + public C execute() throws JMSException, FailoverException { checkNotClosed(); @@ -1668,7 +1655,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); } - BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, noLocal, exclusive, messageSelector, ft, noConsume, autoClose); if (_messageListener != null) @@ -1712,7 +1699,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException; @@ -1722,9 +1709,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @param consumer the consum */ - void deregisterConsumer(BasicMessageConsumer consumer) + void deregisterConsumer(C consumer) { - if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null) + if (_consumers.remove(consumer.getConsumerTag()) != null) { String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) @@ -2012,12 +1999,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + final Iterator<C> it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = it.next(); + final C con = it.next(); if (error != null) { con.notifyError(error); @@ -2048,7 +2035,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess final Iterator it = clonedProducers.iterator(); while (it.hasNext()) { - final BasicMessageProducer prod = (BasicMessageProducer) it.next(); + final P prod = (P) it.next(); prod.close(); } // at this point the _producers map is empty @@ -2096,20 +2083,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @param queueName */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, + private void consumeFromQueue(C consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { int tagId = _nextTag++; - // need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(tagId)); - consumer.setConsumerTag(tag); + consumer.setConsumerTag(tagId); // we must register the consumer in the map before we actually start listening _consumers.put(tagId, consumer); try { - sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag); + sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); } catch (AMQException e) { @@ -2119,26 +2104,26 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException; + public abstract void sendConsume(C consumer, AMQShortString queueName, + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException; - private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) + private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } - private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent) throws JMSException { - return new FailoverRetrySupport<BasicMessageProducer, JMSException>( - new FailoverProtectedOperation<BasicMessageProducer, JMSException>() + return new FailoverRetrySupport<P, JMSException>( + new FailoverProtectedOperation<P, JMSException>() { - public BasicMessageProducer execute() throws JMSException, FailoverException + public P execute() throws JMSException, FailoverException { checkNotClosed(); long producerId = getNextProducerId(); - BasicMessageProducer producer = createMessageProducer(destination, mandatory, + P producer = createMessageProducer(destination, mandatory, immediate, waitUntilSent, producerId); registerProducer(producerId, producer); @@ -2147,7 +2132,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, + public abstract P createMessageProducer(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent, long producerId); private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException @@ -2320,12 +2305,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + final Iterator<C> it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = it.next(); + final C con = it.next(); con.markClosed(); } // at this point the _consumers map will be empty @@ -2360,7 +2345,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @throws AMQException */ - private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException + private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException { AMQDestination amqd = consumer.getDestination(); @@ -2424,15 +2409,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void rejectAllMessages(boolean requeue) { - rejectMessagesForConsumerTag(null, requeue); + rejectMessagesForConsumerTag(0, requeue, true); } /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + * @param rejectAllConsumers */ - private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) + private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) { Iterator messages = _queue.iterator(); if (_logger.isInfoEnabled()) @@ -2453,7 +2439,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString())) + if (rejectAllConsumers || (message.getConsumerTag() == consumerTag)) { if (_logger.isDebugEnabled()) { @@ -2475,12 +2461,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void resubscribeConsumers() throws AMQException { - ArrayList consumers = new ArrayList(_consumers.values()); + ArrayList<C> consumers = new ArrayList<C>(_consumers.values()); _consumers.clear(); - for (Iterator it = consumers.iterator(); it.hasNext();) - { - BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + for (C consumer : consumers) + { consumer.failedOver(); registerConsumer(consumer, true); } @@ -2492,53 +2477,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey for (Iterator it = producers.iterator(); it.hasNext();) { - BasicMessageProducer producer = (BasicMessageProducer) it.next(); + P producer = (P) it.next(); producer.resubscribe(); } } - private void returnBouncedMessage(final ReturnMessage msg) - { - _connection.performConnectionTask(new Runnable() - { - public void run() - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = - _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } - else - { - _connection.exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); - } - - } - catch (Exception e) - { - _logger.error( - "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", - e); - } - } - }); - } - /** * Suspends or unsuspends this session. * @@ -2641,6 +2584,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } + /** Used for debugging in the dispatcher. */ + private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); + + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ class Dispatcher extends Thread { @@ -2652,6 +2599,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private final AtomicLong _rollbackMark = new AtomicLong(-1); private String dispatcherID = "" + System.identityHashCode(this); + + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -2670,7 +2619,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public void rejectPending(BasicMessageConsumer consumer) + public void rejectPending(C consumer) { synchronized (_lock) { @@ -2685,7 +2634,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); //Let the dispatcher deal with this when it gets to them. // closeConsumer @@ -2712,7 +2661,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); - for (BasicMessageConsumer consumer : _consumers.values()) + for (C consumer : _consumers.values()) { if (!consumer.isNoConsume()) { @@ -2778,7 +2727,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _lock.wait(); } - if (!(message instanceof UnprocessedMessage.CloseConsumerMessage) + if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get())) { rejectMessage(message, true); @@ -2840,8 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess //This if block is not needed anymore as bounce messages are handled separately //if (message.getDeliverBody() != null) //{ - final BasicMessageConsumer consumer = - _consumers.get(message.getConsumerTag().toIntValue()); + final C consumer = + _consumers.get(message.getConsumerTag()); if ((consumer == null) || consumer.isClosed()) { |