diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 552 |
1 files changed, 271 insertions, 281 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3b51fb8db0..5203a27f42 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -63,63 +63,54 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.JMSBytesMessage; -import org.apache.qpid.client.message.JMSMapMessage; -import org.apache.qpid.client.message.JMSObjectMessage; -import org.apache.qpid.client.message.JMSStreamMessage; -import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.ReturnMessage; -import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> * </table> * * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For - * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second - * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of - * the fail-over process, the retry handler could be used to automatically retry the operation once the connection - * has been reestablished. All fail-over protected operations should be placed in private methods, with - * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the - * fail-over process sets a nowait flag and uses an async method call instead. - * + * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second + * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of + * the fail-over process, the retry handler could be used to automatically retry the operation once the connection + * has been reestablished. All fail-over protected operations should be placed in private methods, with + * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the + * fail-over process sets a nowait flag and uses an async method call instead. * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, - * after looking at worse bottlenecks first. + * after looking at worse bottlenecks first. */ -public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession +public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - public static final class IdToConsumerMap + + + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>(); - + private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); - public BasicMessageConsumer get(int id) + public C get(int id) { - if((id & 0xFFFFFFF0) == 0) + if ((id & 0xFFFFFFF0) == 0) { - return _fastAccessConsumers[id]; + return (C) _fastAccessConsumers[id]; } else { @@ -127,12 +118,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) + public C put(int id, C consumer) { - BasicMessageConsumer oldVal; - if((id & 0xFFFFFFF0) == 0) + C oldVal; + if ((id & 0xFFFFFFF0) == 0) { - oldVal = _fastAccessConsumers[id]; + oldVal = (C) _fastAccessConsumers[id]; _fastAccessConsumers[id] = consumer; } else @@ -144,13 +135,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - - public BasicMessageConsumer remove(int id) + public C remove(int id) { - BasicMessageConsumer consumer; - if((id & 0xFFFFFFF0) == 0) + C consumer; + if ((id & 0xFFFFFFF0) == 0) { - consumer = _fastAccessConsumers[id]; + consumer = (C) _fastAccessConsumers[id]; _fastAccessConsumers[id] = null; } else @@ -162,15 +152,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public Collection<BasicMessageConsumer> values() + public Collection<C> values() { - ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>(); + ArrayList<C> values = new ArrayList<C>(); - for(int i = 0; i < 16; i++) + for (int i = 0; i < 16; i++) { - if(_fastAccessConsumers[i] != null) + if (_fastAccessConsumers[i] != null) { - values.add(_fastAccessConsumers[i]); + values.add((C) _fastAccessConsumers[i]); } } values.addAll(_slowAccessConsumers.values()); @@ -178,11 +168,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return values; } - public void clear() { _slowAccessConsumers.clear(); - for(int i = 0; i<16; i++) + for (int i = 0; i < 16; i++) { _fastAccessConsumers[i] = null; } @@ -192,8 +181,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - /** Used for debugging in the dispatcher. */ - private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class); /** The default maximum number of prefetched message at which to suspend the channel. */ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; @@ -243,7 +230,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Holds this session unique identifier, used to distinguish it from other sessions. */ protected int _channelId; - /** @todo This does not appear to be set? */ private int _ticket; /** Holds the high mark for prefetched message, at which the session is suspended. */ @@ -270,8 +256,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = - new ConcurrentHashMap<BasicMessageConsumer, String>(); + protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = + new ConcurrentHashMap<C, String>(); /** * Used to hold incoming messages. @@ -280,19 +266,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ protected final FlowControllingBlockingQueue _queue; - /** - * Holds the highest received delivery tag. - */ + /** Holds the highest received delivery tag. */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); - /** - * All the not yet acknowledged message tags - */ + /** All the not yet acknowledged message tags */ protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); - /** - * All the delivered message tags - */ + /** All the delivered message tags */ protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); /** Holds the dispatcher thread for this session. */ @@ -314,16 +294,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right * consumer. */ - protected final IdToConsumerMap _consumers = new IdToConsumerMap(); - - //Map<AMQShortString, BasicMessageConsumer> _consumers = - //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); + + //Map<AMQShortString, BasicMessageConsumer> _consumers = + //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes */ - private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>(); + private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = @@ -377,10 +357,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ private boolean _dirty; - /** Has failover occured on this session */ - private boolean _failedOver; - - + /** Has failover occured on this session with outstanding actions to commit? */ + private boolean _failedOverDirty; private static final class FlowControlIndicator { @@ -388,7 +366,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public synchronized void setFlowControl(boolean flowControl) { - _flowControl= flowControl; + _flowControl = flowControl; notify(); } @@ -412,7 +390,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. */ - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, + protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { @@ -445,21 +423,25 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { + private final AtomicBoolean _suspendState = new AtomicBoolean(); + public void aboveThreshold(int currentValue) { - _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark - + ") so suspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(true)).start(); + _logger.debug( + "Above threshold(" + _defaultPrefetchHighMark + + ") so suspending channel. Current value is " + currentValue); + _suspendState.set(true); + new Thread(new SuspenderRunner(_suspendState)).start(); } public void underThreshold(int currentValue) { - _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark - + ") so unsuspending channel. Current value is " + currentValue); - new Thread(new SuspenderRunner(false)).start(); + _logger.debug( + "Below threshold(" + _defaultPrefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); + _suspendState.set(false); + new Thread(new SuspenderRunner(_suspendState)).start(); } }); @@ -499,10 +481,30 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess close(-1); } + public void checkNotClosed() throws JMSException + { + try + { + super.checkNotClosed(); + } + catch (IllegalStateException ise) + { + // if the Connection has closed then we should throw any exception that has occured that we were not waiting for + AMQStateManager manager = _connection.getProtocolHandler().getStateManager(); + + if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null) + { + ise.setLinkedException(manager.getLastException()); + } + + throw ise; + } + } + public BytesMessage createBytesMessage() throws JMSException { checkNotClosed(); - return new JMSBytesMessage(); + return new JMSBytesMessage(getMessageDelegateFactory()); } /** @@ -515,7 +517,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (isClosed()) { throw new IllegalStateException("Session is already closed"); - } + } else if (hasFailedOver()) { throw new IllegalStateException("has failed over"); @@ -560,39 +562,35 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param exchangeName The exchange to bind the queue on. * * @throws AMQException If the queue cannot be bound for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName,final AMQDestination destination) throws AMQException + final AMQShortString exchangeName, final AMQDestination destination) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendQueueBind(queueName,routingKey,arguments,exchangeName,destination); + sendQueueBind(queueName, routingKey, arguments, exchangeName, destination); return null; } }, _connection).execute(); } - - public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException + public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException { - if( consumer.getQueuename() != null) + if (consumer.getQueuename() != null) { - bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd); + bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd); } } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException; + final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException; /** - * Closes the session. * * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close @@ -602,14 +600,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be - * re-opened. May need to examine this more carefully. - * + * re-opened. May need to examine this more carefully. * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, - * because the failover process sends the failover event before acquiring the mutex itself. + * because the failover process sends the failover event before acquiring the mutex itself. */ public void close(long timeout) throws JMSException { @@ -617,13 +612,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); _logger.info("Closing session: " + this); // + ":" - // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } // Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { - synchronized (_connection.getFailoverMutex()) + synchronized (getFailoverMutex()) { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session. @@ -634,7 +629,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess try { - sendClose(timeout); + sendClose(timeout); } catch (AMQException e) { @@ -685,7 +680,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (!_closed.getAndSet(true)) { - synchronized (_connection.getFailoverMutex()) + synchronized (getFailoverMutex()) { synchronized (_messageDeliveryLock) { @@ -701,7 +696,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess amqe = new AMQException("Closing session forcibly", e); } - _connection.deregisterSession(_channelId); closeProducersAndConsumers(amqe); } @@ -719,12 +713,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does * not mean that the commit is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void commit() throws JMSException { - checkTransacted(); + checkTransacted(); try { @@ -743,6 +736,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } // Commits outstanding messages and acknowledgments sendCommit(); + markClean(); } catch (AMQException e) { @@ -756,11 +750,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void sendCommit() throws AMQException, FailoverException; - public void confirmConsumerCancelled(AMQShortString consumerTag) + + public void confirmConsumerCancelled(int consumerTag) { // Remove the consumer from the map - BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue()); + C consumer = _consumers.get(consumerTag); if (consumer != null) { if (!consumer.isNoConsume()) // Normal Consumer @@ -788,10 +783,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // consumer.markClosed(); - - if (consumer.isAutoClose()) - { + { // There is a small window where the message is between the two queues in the dispatcher. if (consumer.isClosed()) { @@ -802,6 +795,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess deregisterConsumer(consumer); } + else + { + _queue.add(new CloseConsumerMessage(consumer)); + } } } } @@ -847,7 +844,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } - public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + public C createExclusiveConsumer(Destination destination) throws JMSException { checkValidDestination(destination); @@ -855,7 +852,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } - public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); @@ -873,7 +869,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector, null, false, false); } - public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { @@ -883,7 +878,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector, null, false, false); } - public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector) throws JMSException { @@ -920,7 +914,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException; public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkNotClosed(); checkValidTopic(topic); @@ -929,8 +923,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _subscriptions.get(name).close(); } AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + C consumer = (C) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer); _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); @@ -940,7 +934,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public MapMessage createMapMessage() throws JMSException { checkNotClosed(); - return new JMSMapMessage(); + return new JMSMapMessage(getMessageDelegateFactory()); } public javax.jms.Message createMessage() throws JMSException @@ -951,7 +945,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public ObjectMessage createObjectMessage() throws JMSException { checkNotClosed(); - return (ObjectMessage) new JMSObjectMessage(); + return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory()); } public ObjectMessage createObjectMessage(Serializable object) throws JMSException @@ -962,23 +956,23 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return msg; } - public BasicMessageProducer createProducer(Destination destination) throws JMSException + public P createProducer(Destination destination) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); } - public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException + public P createProducer(Destination destination, boolean immediate) throws JMSException { return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) + public P createProducer(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate); } - public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate, + public P createProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException { return createProducerImpl(destination, mandatory, immediate, waitUntilSent); @@ -988,7 +982,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic); + return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1025,24 +1019,44 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param exclusive Flag to indicate that the queue is exclusive to this client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException { + createQueue(name, autoDelete, durable, exclusive, null); + } + + /** + * Declares the named queue. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the queue to declare. + * @param autoDelete + * @param durable Flag to indicate that the queue is durable. + * @param exclusive Flag to indicate that the queue is exclusive to this client. + * @param arguments Arguments used to set special properties of the queue + * + * @throws AMQException If the queue cannot be declared for any reason. + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive, final Map<String, Object> arguments) throws AMQException + { new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendCreateQueue(name, autoDelete, durable, exclusive); + sendCreateQueue(name, autoDelete, durable, exclusive, arguments); return null; } }, _connection).execute(); } public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive)throws AMQException, FailoverException; + final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException; + /** * Creates a QueueReceiver * @@ -1056,7 +1070,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); + C consumer = (C) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); } @@ -1075,7 +1089,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); + C consumer = (C) createConsumer(destination, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1093,7 +1107,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); + C consumer = (C) createConsumer(dest); return new QueueReceiverAdaptor(dest, consumer); } @@ -1112,7 +1126,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector); + C consumer = (C) createConsumer(dest, messageSelector); return new QueueReceiverAdaptor(dest, consumer); } @@ -1127,11 +1141,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public StreamMessage createStreamMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived + // calls through connection.closeAllSessions which is also called by the public connection.close() + // with a null cause + // When we are closing the Session due to a protocol session error we simply create a new AMQException + // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. + // We need to determin here if the connection should be + + synchronized (getFailoverMutex()) { checkNotClosed(); - return new JMSStreamMessage(); + return new JMSStreamMessage(getMessageDelegateFactory()); } } @@ -1150,10 +1171,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); } - /** * Creates a non-durable subscriber with a message selector * @@ -1171,7 +1191,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); } public abstract TemporaryQueue createTemporaryQueue() throws JMSException; @@ -1185,14 +1205,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public TextMessage createTextMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized (getFailoverMutex()) { checkNotClosed(); - return new JMSTextMessage(); + return new JMSTextMessage(getMessageDelegateFactory()); } } + protected Object getFailoverMutex() + { + return _connection.getFailoverMutex(); + } + public TextMessage createTextMessage(String text) throws JMSException { @@ -1340,17 +1365,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { _logger.debug("Message[" + message.toString() + "] received in session"); } - - if (message instanceof ReturnMessage) - { - // Return of the bounced message. - returnBouncedMessage((ReturnMessage)message); - } - else - { - _highestDeliveryTag.set(message.getDeliveryTag()); - _queue.add(message); - } + _highestDeliveryTag.set(message.getDeliveryTag()); + _queue.add(message); } public void declareAndBind(AMQDestination amqd) @@ -1360,7 +1376,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); AMQShortString queueName = declareQueue(amqd, protocolHandler, false); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd); + bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } /** @@ -1375,7 +1391,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * <li>Stop message delivery.</li> * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. - * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> + * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> * </ul> * * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and @@ -1429,7 +1445,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - abstract void sendRecover() throws AMQException, FailoverException; + protected abstract void sendRecover() throws AMQException, FailoverException; public void rejectMessage(UnprocessedMessage message, boolean requeue) { @@ -1465,7 +1481,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does * not mean that the rollback is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void rollback() throws JMSException @@ -1507,8 +1522,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void releaseForRollback(); - public abstract void sendRollback() throws AMQException, FailoverException ; - + public abstract void sendRollback() throws AMQException, FailoverException; public void run() { @@ -1576,7 +1590,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } - else + else // Queue Browser { if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) @@ -1591,7 +1605,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh, + protected C createConsumerImpl(final Destination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { @@ -1615,10 +1629,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector = selector; } - return new FailoverRetrySupport<MessageConsumer, JMSException>( - new FailoverProtectedOperation<MessageConsumer, JMSException>() + return new FailoverRetrySupport<C, JMSException>( + new FailoverProtectedOperation<C, JMSException>() { - public MessageConsumer execute() throws JMSException, FailoverException + public C execute() throws JMSException, FailoverException { checkNotClosed(); @@ -1630,13 +1644,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess final FieldTable ft = FieldTableFactory.newFieldTable(); // if (rawSelector != null) // ft.put("headers", rawSelector.getDataAsBytes()); - if (rawSelector != null) + // rawSelector is used by HeadersExchange and is not a JMS Selector + if (rawSelector != null) { ft.addAll(rawSelector); } + + if (messageSelector != null) + { + ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); + } - BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow, - noLocal,exclusive, messageSelector, ft, noConsume, autoClose); + C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + noLocal, exclusive, messageSelector, ft, noConsume, autoClose); if (_messageListener != null) { @@ -1679,9 +1699,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose) throws JMSException; + public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments, + final boolean noConsume, final boolean autoClose) throws JMSException; /** * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer @@ -1689,9 +1709,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @param consumer the consum */ - void deregisterConsumer(BasicMessageConsumer consumer) + void deregisterConsumer(C consumer) { - if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null) + if (_consumers.remove(consumer.getConsumerTag()) != null) { String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) @@ -1744,12 +1764,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. * * @throws JMSException If the query fails for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException; - + public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; /** @@ -1771,7 +1790,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ void resubscribe() throws AMQException { - _failedOver = true; + if (_dirty) + { + _failedOverDirty = true; + } resubscribeProducers(); resubscribeConsumers(); } @@ -1790,10 +1812,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * * @throws AMQException If the session cannot be started for any reason. - * * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the - * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages - * for each subsequent call to flow.. only need to do this if we have called stop. + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. */ void start() throws AMQException { @@ -1978,12 +1999,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + final Iterator<C> it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = it.next(); + final C con = it.next(); if (error != null) { con.notifyError(error); @@ -1994,7 +2015,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } // at this point the _consumers map will be empty - if (_dispatcher != null) + if (_dispatcher != null) { _dispatcher.close(); _dispatcher = null; @@ -2014,7 +2035,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess final Iterator it = clonedProducers.iterator(); while (it.hasNext()) { - final BasicMessageProducer prod = (BasicMessageProducer) it.next(); + final P prod = (P) it.next(); prod.close(); } // at this point the _producers map is empty @@ -2062,20 +2083,18 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @param queueName */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, + private void consumeFromQueue(C consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { int tagId = _nextTag++; - // need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(tagId)); - consumer.setConsumerTag(tag); + consumer.setConsumerTag(tagId); // we must register the consumer in the map before we actually start listening _consumers.put(tagId, consumer); try { - sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag); + sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); } catch (AMQException e) { @@ -2085,27 +2104,27 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException; + public abstract void sendConsume(C consumer, AMQShortString queueName, + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException; - private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) + private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { return createProducerImpl(destination, mandatory, immediate, false); } - private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory, + private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate, final boolean waitUntilSent) throws JMSException { - return new FailoverRetrySupport<BasicMessageProducer, JMSException>( - new FailoverProtectedOperation<BasicMessageProducer, JMSException>() + return new FailoverRetrySupport<P, JMSException>( + new FailoverProtectedOperation<P, JMSException>() { - public BasicMessageProducer execute() throws JMSException, FailoverException + public P execute() throws JMSException, FailoverException { checkNotClosed(); long producerId = getNextProducerId(); - BasicMessageProducer producer = createMessageProducer(destination, mandatory, - immediate, waitUntilSent, producerId); + P producer = createMessageProducer(destination, mandatory, + immediate, waitUntilSent, producerId); registerProducer(producerId, producer); return producer; @@ -2113,21 +2132,20 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent, long producerId); + public abstract P createMessageProducer(final Destination destination, final boolean mandatory, + final boolean immediate, final boolean waitUntilSent, long producerId); private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - /** * Returns the number of messages currently queued for the given destination. * * <p/>Note that this operation automatically retries in the event of fail-over. * - * @param amqd The destination to be checked + * @param amqd The destination to be checked * * @return the number of queued messages. * @@ -2147,7 +2165,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; + protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; /** * Declares the named exchange and type of exchange. @@ -2160,7 +2178,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param nowait * * @throws AMQException If the exchange cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, @@ -2177,8 +2194,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; - + final boolean nowait) throws AMQException, FailoverException; /** * Declares a queue for a JMS destination. @@ -2196,9 +2212,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * the client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Verify the destiation is valid or throw an exception. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, @@ -2224,7 +2238,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException; + public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2234,7 +2248,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param queueName The name of the queue to delete. * * @throws JMSException If the queue could not be deleted for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected void deleteQueue(final AMQShortString queueName) throws JMSException @@ -2256,7 +2269,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; + public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; private long getNextProducerId() { @@ -2292,12 +2305,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); + final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); - final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); + final Iterator<C> it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = it.next(); + final C con = it.next(); con.markClosed(); } // at this point the _consumers map will be empty @@ -2332,7 +2345,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @throws AMQException */ - private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException + private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException { AMQDestination amqd = consumer.getDestination(); @@ -2345,8 +2358,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // store the consumer queue name consumer.setQueuename(queueName); - // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) @@ -2397,15 +2409,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void rejectAllMessages(boolean requeue) { - rejectMessagesForConsumerTag(null, requeue); + rejectMessagesForConsumerTag(0, requeue, true); } /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) + * @param rejectAllConsumers */ - private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) + private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) { Iterator messages = _queue.iterator(); if (_logger.isInfoEnabled()) @@ -2426,12 +2439,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString())) + if (rejectAllConsumers || (message.getConsumerTag() == consumerTag)) { if (_logger.isDebugEnabled()) { _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" - + message.getDeliveryTag()); + + message.getDeliveryTag()); } messages.remove(); @@ -2448,12 +2461,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private void resubscribeConsumers() throws AMQException { - ArrayList consumers = new ArrayList(_consumers.values()); + ArrayList<C> consumers = new ArrayList<C>(_consumers.values()); _consumers.clear(); - for (Iterator it = consumers.iterator(); it.hasNext();) - { - BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + for (C consumer : consumers) + { consumer.failedOver(); registerConsumer(consumer, true); } @@ -2465,53 +2477,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey for (Iterator it = producers.iterator(); it.hasNext();) { - BasicMessageProducer producer = (BasicMessageProducer) it.next(); + P producer = (P) it.next(); producer.resubscribe(); } } - private void returnBouncedMessage(final ReturnMessage msg) - { - _connection.performConnectionTask(new Runnable() - { - public void run() - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = - _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } - else - { - _connection.exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); - } - - } - catch (Exception e) - { - _logger.error( - "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", - e); - } - } - }); - } - /** * Suspends or unsuspends this session. * @@ -2519,7 +2489,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * should be unsuspended. * * @throws AMQException If the session cannot be suspended for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException @@ -2560,7 +2529,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return getAMQConnection().getMaxPrefetch() > 0; } - /** Signifies that the session has pending sends to commit. */ public void markDirty() { @@ -2571,7 +2539,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public void markClean() { _dirty = false; - _failedOver = false; + _failedOverDirty = false; } /** @@ -2581,7 +2549,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ public boolean hasFailedOver() { - return _failedOver; + return _failedOverDirty; } /** @@ -2604,12 +2572,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _flowControl.setFlowControl(active); } - public void checkFlowControl() throws InterruptedException { - synchronized(_flowControl) + synchronized (_flowControl) { - while(!_flowControl.getFlowControl()) + while (!_flowControl.getFlowControl()) { _flowControl.wait(); } @@ -2617,6 +2584,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } + /** Used for debugging in the dispatcher. */ + private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ class Dispatcher extends Thread @@ -2629,6 +2599,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private final AtomicLong _rollbackMark = new AtomicLong(-1); private String dispatcherID = "" + System.identityHashCode(this); + + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -2647,7 +2619,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public void rejectPending(BasicMessageConsumer consumer) + public void rejectPending(C consumer) { synchronized (_lock) { @@ -2662,7 +2634,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); //Let the dispatcher deal with this when it gets to them. // closeConsumer @@ -2689,7 +2661,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); - for (BasicMessageConsumer consumer : _consumers.values()) + for (C consumer : _consumers.values()) { if (!consumer.isNoConsume()) { @@ -2755,7 +2727,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _lock.wait(); } - if (tagLE(deliveryTag, _rollbackMark.get())) + if (!(message instanceof CloseConsumerMessage) + && tagLE(deliveryTag, _rollbackMark.get())) { rejectMessage(message, true); } @@ -2816,8 +2789,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess //This if block is not needed anymore as bounce messages are handled separately //if (message.getDeliverBody() != null) //{ - final BasicMessageConsumer consumer = - _consumers.get(message.getConsumerTag().toIntValue()); + final C consumer = + _consumers.get(message.getConsumerTag()); if ((consumer == null) || consumer.isClosed()) { @@ -2826,14 +2799,26 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (consumer == null) { _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " - + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); + + message.getDeliveryTag() + "] from queue " + + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); } else { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " + " consumer(" - + message.getConsumerTag() + ") is closed rejecting(requeue)..."); + if (consumer.isNoConsume()) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + + message.getDeliveryTag() + "] from queue " + " consumer(" + + message.getConsumerTag() + ") is closed and a browser so dropping..."); + //DROP MESSAGE + return; + + } + else + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + + message.getDeliveryTag() + "] from queue " + " consumer(" + + message.getConsumerTag() + ") is closed rejecting(requeue)..."); + } } } // Don't reject if we're already closing @@ -2850,9 +2835,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - abstract boolean tagLE(long tag1, long tag2); + protected abstract boolean tagLE(long tag1, long tag2); + + protected abstract boolean updateRollbackMark(long current, long deliveryTag); - abstract boolean updateRollbackMark(long current, long deliveryTag); + public abstract AMQMessageDelegateFactory getMessageDelegateFactory(); /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException @@ -2880,9 +2867,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private class SuspenderRunner implements Runnable { - private boolean _suspend; + private AtomicBoolean _suspend; - public SuspenderRunner(boolean suspend) + public SuspenderRunner(AtomicBoolean suspend) { _suspend = suspend; } @@ -2891,7 +2878,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { try { - suspendChannel(_suspend); + synchronized (_suspensionLock) + { + suspendChannel(_suspend.get()); + } } catch (AMQException e) { |