diff options
Diffstat (limited to 'M4-RCs/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | M4-RCs/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 2908 |
1 files changed, 0 insertions, 2908 deletions
diff --git a/M4-RCs/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/M4-RCs/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java deleted file mode 100644 index b5d12d9520..0000000000 --- a/M4-RCs/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ /dev/null @@ -1,2908 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client; - -import java.io.Serializable; -import java.net.URISyntaxException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - -import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInvalidArgumentException; -import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.failover.FailoverNoopSupport; -import org.apache.qpid.client.failover.FailoverProtectedOperation; -import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.*; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.util.FlowControllingBlockingQueue; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.jms.Session; -import org.apache.qpid.url.AMQBindingURL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> - * </table> - * - * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For - * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second - * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of - * the fail-over process, the retry handler could be used to automatically retry the operation once the connection - * has been reestablished. All fail-over protected operations should be placed in private methods, with - * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the - * fail-over process sets a nowait flag and uses an async method call instead. - * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, - * after looking at worse bottlenecks first. - */ -public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession -{ - - - public static final class IdToConsumerMap<C extends BasicMessageConsumer> - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return consumer; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } - - public Collection<C> values() - { - ArrayList<C> values = new ArrayList<C>(); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); - - return values; - } - - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } - - /** Used for debugging. */ - private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - - /** - * The default value for immediate flag used by producers created by this session is false. That is, a consumer does - * not need to be attached to a queue. - */ - protected static final boolean DEFAULT_IMMEDIATE = false; - - /** - * The default value for mandatory flag used by producers created by this session is true. That is, server will not - * silently drop messages where no queue is connected to the exchange for the message. - */ - protected static final boolean DEFAULT_MANDATORY = true; - - /** System property to enable strict AMQP compliance. */ - public static final String STRICT_AMQP = "STRICT_AMQP"; - - /** Strict AMQP default setting. */ - public static final String STRICT_AMQP_DEFAULT = "false"; - - /** System property to enable failure if strict AMQP compliance is violated. */ - public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; - - /** Strickt AMQP failure default. */ - public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; - - /** System property to enable immediate message prefetching. */ - public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; - - /** Immediate message prefetch default. */ - public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - - /** The connection to which this session belongs. */ - protected AMQConnection _connection; - - /** Used to indicate whether or not this is a transactional session. */ - protected boolean _transacted; - - /** Holds the sessions acknowledgement mode. */ - protected final int _acknowledgeMode; - - /** Holds this session unique identifier, used to distinguish it from other sessions. */ - protected int _channelId; - - private int _ticket; - - /** Holds the high mark for prefetched message, at which the session is suspended. */ - private int _defaultPrefetchHighMark; - - /** Holds the low mark for prefetched messages, below which the session is resumed. */ - private int _defaultPrefetchLowMark; - - /** Holds the message listener, if any, which is attached to this session. */ - private MessageListener _messageListener = null; - - /** Used to indicate that this session has been started at least once. */ - private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); - - /** - * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only - * keeps a record of subscriptions which have been created in the current instance. It does not remember - * subscriptions between executions of the client. - */ - protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = - new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); - - /** - * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked - * up in the {@link #_subscriptions} map. - */ - protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = - new ConcurrentHashMap<C, String>(); - - /** - * Used to hold incoming messages. - * - * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. - */ - protected final FlowControllingBlockingQueue _queue; - - /** Holds the highest received delivery tag. */ - private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); - private final AtomicLong _rollbackMark = new AtomicLong(-1); - - /** All the not yet acknowledged message tags */ - protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); - - /** All the delivered message tags */ - protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); - - /** Holds the dispatcher thread for this session. */ - protected Dispatcher _dispatcher; - - /** Holds the message factory factory for this session. */ - protected MessageFactoryRegistry _messageFactoryRegistry; - - /** Holds all of the producers created by this session, keyed by their unique identifiers. */ - private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); - - /** - * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume - * methods. - */ - private int _nextTag = 1; - - /** - * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right - * consumer. - */ - protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); - - //Map<AMQShortString, BasicMessageConsumer> _consumers = - //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); - - /** - * Contains a list of consumers which have been removed but which might still have - * messages to acknowledge, eg in client ack or transacted modes - */ - private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>(); - - /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ - private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = - new ConcurrentHashMap<Destination, AtomicInteger>(); - - /** - * Used as a source of unique identifiers for producers within the session. - * - * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one - * thread of control is allowed to create producers for any given session instance. - */ - private long _nextProducerId; - - /** - * Set when recover is called. This is to handle the case where recover() is called by application code during - * onMessage() processing to enure that an auto ack is not sent. - */ - private boolean _inRecovery; - - /** Used to indicates that the connection to which this session belongs, has been stopped. */ - private boolean _connectionStopped; - - /** Used to indicate that this session has a message listener attached to it. */ - private boolean _hasMessageListeners; - - /** Used to indicate that this session has been suspended. */ - private boolean _suspended; - - /** - * Used to protect the suspension of this session, so that critical code can be executed during suspension, - * without the session being resumed by other threads. - */ - private final Object _suspensionLock = new Object(); - - /** - * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel. - * - * @todo This is accessed only within a synchronized method, so does not need to be atomic. - */ - protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); - - /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ - protected final boolean _immediatePrefetch; - - /** Indicates that warnings should be generated on violations of the strict AMQP. */ - protected final boolean _strictAMQP; - - /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ - protected final boolean _strictAMQPFATAL; - private final Object _messageDeliveryLock = new Object(); - - /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ - private boolean _dirty; - /** Has failover occured on this session with outstanding actions to commit? */ - private boolean _failedOverDirty; - - private static final class FlowControlIndicator - { - private volatile boolean _flowControl = true; - - public synchronized void setFlowControl(boolean flowControl) - { - _flowControl = flowControl; - notify(); - } - - public boolean getFlowControl() - { - return _flowControl; - } - } - - /** Flow control */ - private FlowControlIndicator _flowControl = new FlowControlIndicator(); - - /** - * Creates a new session on a connection. - * - * @param con The connection on which to create the session. - * @param channelId The unique identifier for the session. - * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. - * @param messageFactoryRegistry The message factory factory for the session. - * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. - * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. - */ - protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) - { - - _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); - _strictAMQPFATAL = - Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); - _immediatePrefetch = - _strictAMQP - || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); - - _connection = con; - _transacted = transacted; - if (transacted) - { - _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED; - } - else - { - _acknowledgeMode = acknowledgeMode; - } - - _channelId = channelId; - _messageFactoryRegistry = messageFactoryRegistry; - _defaultPrefetchHighMark = defaultPrefetchHighMark; - _defaultPrefetchLowMark = defaultPrefetchLowMark; - - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _queue = - new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, - new FlowControllingBlockingQueue.ThresholdListener() - { - private final AtomicBoolean _suspendState = new AtomicBoolean(); - - public void aboveThreshold(int currentValue) - { - _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark - + ") so suspending channel. Current value is " + currentValue); - _suspendState.set(true); - new Thread(new SuspenderRunner(_suspendState)).start(); - - } - - public void underThreshold(int currentValue) - { - _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark - + ") so unsuspending channel. Current value is " + currentValue); - _suspendState.set(false); - new Thread(new SuspenderRunner(_suspendState)).start(); - - } - }); - } - else - { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null); - } - } - - /** - * Creates a new session on a connection with the default message factory factory. - * - * @param con The connection on which to create the session. - * @param channelId The unique identifier for the session. - * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. - * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. - * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. - */ - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, - int defaultPrefetchLow) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, - defaultPrefetchLow); - } - - // ===== JMS Session methods. - - /** - * Closes the session with no timeout. - * - * @throws JMSException If the JMS provider fails to close the session due to some internal error. - */ - public void close() throws JMSException - { - close(-1); - } - - public void checkNotClosed() throws JMSException - { - try - { - super.checkNotClosed(); - } - catch (IllegalStateException ise) - { - // if the Connection has closed then we should throw any exception that has occured that we were not waiting for - AMQStateManager manager = _connection.getProtocolHandler().getStateManager(); - - if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null) - { - ise.setLinkedException(manager.getLastException()); - } - - throw ise; - } - } - - public BytesMessage createBytesMessage() throws JMSException - { - checkNotClosed(); - return new JMSBytesMessage(getMessageDelegateFactory()); - } - - /** - * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. - * - * @throws IllegalStateException If the session is closed. - */ - public void acknowledge() throws IllegalStateException - { - if (isClosed()) - { - throw new IllegalStateException("Session is already closed"); - } - else if (hasFailedOver()) - { - throw new IllegalStateException("has failed over"); - } - - while (true) - { - Long tag = _unacknowledgedMessageTags.poll(); - if (tag == null) - { - break; - } - acknowledgeMessage(tag, false); - } - } - - /** - * Acknowledge one or many messages. - * - * @param deliveryTag The tag of the last message to be acknowledged. - * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the - * delivery tag, <tt>false</tt> to just acknowledge that message. - * - * @todo Be aware of possible changes to parameter order as versions change. - */ - public abstract void acknowledgeMessage(long deliveryTag, boolean multiple); - - public MethodRegistry getMethodRegistry() - { - MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); - return methodRegistry; - } - - /** - * Binds the named queue, with the specified routing key, to the named exchange. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param queueName The name of the queue to bind. - * @param routingKey The routing key to bind the queue with. - * @param arguments Additional arguments. - * @param exchangeName The exchange to bind the queue on. - * - * @throws AMQException If the queue cannot be bound for any reason. - * @todo Be aware of possible changes to parameter order as versions change. - * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? - */ - public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination destination) throws AMQException - { - /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() - { - public Object execute() throws AMQException, FailoverException - { - sendQueueBind(queueName, routingKey, arguments, exchangeName, destination); - return null; - } - }, _connection).execute(); - } - - public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException - { - if (consumer.getQueuename() != null) - { - bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd); - } - } - - public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException; - - /** - * Closes the session. - * - * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close - * the channel. This is because the channel is marked as closed before the request to close it is made, so the - * fail-over should not re-open it. - * - * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. - * - * @throws JMSException If the JMS provider fails to close the session due to some internal error. - * @todo Be aware of possible changes to parameter order as versions change. - * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be - * re-opened. May need to examine this more carefully. - * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, - * because the failover process sends the failover event before acquiring the mutex itself. - */ - public void close(long timeout) throws JMSException - { - if (_logger.isInfoEnabled()) - { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - _logger.info("Closing session: " + this); // + ":" - // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); - } - - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) - { - synchronized (getFailoverMutex()) - { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_messageDeliveryLock) - { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try - { - sendClose(timeout); - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - // This is ignored because the channel is already marked as closed so the fail-over process will - // not re-open it. - catch (FailoverException e) - { - _logger.debug( - "Got FailoverException during channel close, ignored as channel already marked as closed."); - } - finally - { - _connection.deregisterSession(_channelId); - } - } - } - } - } - - public abstract void sendClose(long timeout) throws AMQException, FailoverException; - - /** - * Called when the server initiates the closure of the session unilaterally. - * - * @param e the exception that caused this session to be closed. Null causes the - */ - public void closed(Throwable e) throws JMSException - { - // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived - // calls through connection.closeAllSessions which is also called by the public connection.close() - // with a null cause - // When we are closing the Session due to a protocol session error we simply create a new AMQException - // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. - // We need to determin here if the connection should be - - if (e instanceof AMQDisconnectedException) - { - if (_dispatcher != null) - { - // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcher.interrupt(); - } - } - - if (!_closed.getAndSet(true)) - { - synchronized (getFailoverMutex()) - { - synchronized (_messageDeliveryLock) - { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); - } - } - } - } - - /** - * Commits all messages done in this transaction and releases any locks currently held. - * - * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the - * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. - * The client will be unable to determine whether or not the commit actually happened on the broker in this case. - * - * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does - * not mean that the commit is known to have failed, merely that it is not known whether it - * failed or not. - * @todo Be aware of possible changes to parameter order as versions change. - */ - public void commit() throws JMSException - { - checkTransacted(); - - try - { - - // TGM FIXME: what about failover? - // Acknowledge all delivered messages - while (true) - { - Long tag = _deliveredMessageTags.poll(); - if (tag == null) - { - break; - } - - acknowledgeMessage(tag, false); - } - // Commits outstanding messages and acknowledgments - sendCommit(); - markClean(); - } - catch (AMQException e) - { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); - } - catch (FailoverException e) - { - throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); - } - } - - public abstract void sendCommit() throws AMQException, FailoverException; - - - public void confirmConsumerCancelled(int consumerTag) - { - - // Remove the consumer from the map - C consumer = _consumers.get(consumerTag); - if (consumer != null) - { - if (!consumer.isNoConsume()) // Normal Consumer - { - // Clean the Maps up first - // Flush any pending messages for this consumerTag - if (_dispatcher != null) - { - _logger.info("Dispatcher is not null"); - } - else - { - _logger.info("Dispatcher is null so created stopped dispatcher"); - startDispatcherIfNecessary(true); - } - - _dispatcher.rejectPending(consumer); - } - else // Queue Browser - { - // Just close the consumer - // fixme the CancelOK is being processed before the arriving messages.. - // The dispatcher is still to process them so the server sent in order but the client - // has yet to receive before the close comes in. - - // consumer.markClosed(); - - if (consumer.isAutoClose()) - { - // There is a small window where the message is between the two queues in the dispatcher. - if (consumer.isClosed()) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing consumer:" + consumer.debugIdentity()); - } - - deregisterConsumer(consumer); - } - else - { - _queue.add(new CloseConsumerMessage(consumer)); - } - } - } - } - } - - public QueueBrowser createBrowser(Queue queue) throws JMSException - { - if (isStrictAMQP()) - { - throw new UnsupportedOperationException(); - } - - return createBrowser(queue, null); - } - - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException - { - if (isStrictAMQP()) - { - throw new UnsupportedOperationException(); - } - - checkNotClosed(); - checkValidQueue(queue); - - return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector); - } - - public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, - messageSelector, null, true, true); - } - - public MessageConsumer createConsumer(Destination destination) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null, - false, false); - } - - public C createExclusiveConsumer(Destination destination) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, - false, false); - } - - public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), - messageSelector, null, false, false); - } - - public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic), - messageSelector, null, false, false); - } - - public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true, - messageSelector, null, false, false); - } - - public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, - String selector) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false); - } - - public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); - } - - public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, - String selector, FieldTable rawSelector) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false); - } - - public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector, FieldTable rawSelector) throws JMSException - { - checkValidDestination(destination); - - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false, - false); - } - - public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException; - - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException - { - checkNotClosed(); - checkValidTopic(topic); - if (_subscriptions.containsKey(name)) - { - _subscriptions.get(name).close(); - } - AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); - C consumer = (C) createConsumer(dest, messageSelector, noLocal); - TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer); - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } - - public MapMessage createMapMessage() throws JMSException - { - checkNotClosed(); - return new JMSMapMessage(getMessageDelegateFactory()); - } - - public javax.jms.Message createMessage() throws JMSException - { - return createBytesMessage(); - } - - public ObjectMessage createObjectMessage() throws JMSException - { - checkNotClosed(); - return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory()); - } - - public ObjectMessage createObjectMessage(Serializable object) throws JMSException - { - ObjectMessage msg = createObjectMessage(); - msg.setObject(object); - - return msg; - } - - public P createProducer(Destination destination) throws JMSException - { - return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); - } - - public P createProducer(Destination destination, boolean immediate) throws JMSException - { - return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); - } - - public P createProducer(Destination destination, boolean mandatory, boolean immediate) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate); - } - - public P createProducer(Destination destination, boolean mandatory, boolean immediate, - boolean waitUntilSent) throws JMSException - { - return createProducerImpl(destination, mandatory, immediate, waitUntilSent); - } - - public TopicPublisher createPublisher(Topic topic) throws JMSException - { - checkNotClosed(); - - return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic); - } - - public Queue createQueue(String queueName) throws JMSException - { - checkNotClosed(); - if (queueName.indexOf('/') == -1) - { - return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName)); - } - else - { - try - { - return new AMQQueue(new AMQBindingURL(queueName)); - } - catch (URISyntaxException urlse) - { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - - throw jmse; - } - } - } - - /** - * Declares the named queue. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param name The name of the queue to declare. - * @param autoDelete - * @param durable Flag to indicate that the queue is durable. - * @param exclusive Flag to indicate that the queue is exclusive to this client. - * - * @throws AMQException If the queue cannot be declared for any reason. - * @todo Be aware of possible changes to parameter order as versions change. - */ - public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive) throws AMQException - { - createQueue(name, autoDelete, durable, exclusive, null); - } - - /** - * Declares the named queue. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param name The name of the queue to declare. - * @param autoDelete - * @param durable Flag to indicate that the queue is durable. - * @param exclusive Flag to indicate that the queue is exclusive to this client. - * @param arguments Arguments used to set special properties of the queue - * - * @throws AMQException If the queue cannot be declared for any reason. - * @todo Be aware of possible changes to parameter order as versions change. - */ - public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive, final Map<String, Object> arguments) throws AMQException - { - new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() - { - public Object execute() throws AMQException, FailoverException - { - sendCreateQueue(name, autoDelete, durable, exclusive, arguments); - return null; - } - }, _connection).execute(); - } - - public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException; - - /** - * Creates a QueueReceiver - * - * @param destination - * - * @return QueueReceiver - a wrapper around our MessageConsumer - * - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination) throws JMSException - { - checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - C consumer = (C) createConsumer(destination); - - return new QueueReceiverAdaptor(dest, consumer); - } - - /** - * Creates a QueueReceiver using a message selector - * - * @param destination - * @param messageSelector - * - * @return QueueReceiver - a wrapper around our MessageConsumer - * - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException - { - checkValidDestination(destination); - AMQQueue dest = (AMQQueue) destination; - C consumer = (C) createConsumer(destination, messageSelector); - - return new QueueReceiverAdaptor(dest, consumer); - } - - /** - * Creates a QueueReceiver wrapping a MessageConsumer - * - * @param queue - * - * @return QueueReceiver - * - * @throws JMSException - */ - public QueueReceiver createReceiver(Queue queue) throws JMSException - { - checkNotClosed(); - AMQQueue dest = (AMQQueue) queue; - C consumer = (C) createConsumer(dest); - - return new QueueReceiverAdaptor(dest, consumer); - } - - /** - * Creates a QueueReceiver wrapping a MessageConsumer using a message selector - * - * @param queue - * @param messageSelector - * - * @return QueueReceiver - * - * @throws JMSException - */ - public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException - { - checkNotClosed(); - AMQQueue dest = (AMQQueue) queue; - C consumer = (C) createConsumer(dest, messageSelector); - - return new QueueReceiverAdaptor(dest, consumer); - } - - public QueueSender createSender(Queue queue) throws JMSException - { - checkNotClosed(); - - // return (QueueSender) createProducer(queue); - return new QueueSenderAdapter(createProducer(queue), queue); - } - - public StreamMessage createStreamMessage() throws JMSException - { - // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived - // calls through connection.closeAllSessions which is also called by the public connection.close() - // with a null cause - // When we are closing the Session due to a protocol session error we simply create a new AMQException - // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. - // We need to determin here if the connection should be - - synchronized (getFailoverMutex()) - { - checkNotClosed(); - - return new JMSStreamMessage(getMessageDelegateFactory()); - } - } - - /** - * Creates a non-durable subscriber - * - * @param topic - * - * @return TopicSubscriber - a wrapper round our MessageConsumer - * - * @throws JMSException - */ - public TopicSubscriber createSubscriber(Topic topic) throws JMSException - { - checkNotClosed(); - AMQTopic dest = checkValidTopic(topic); - - // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest)); - } - - /** - * Creates a non-durable subscriber with a message selector - * - * @param topic - * @param messageSelector - * @param noLocal - * - * @return TopicSubscriber - a wrapper round our MessageConsumer - * - * @throws JMSException - */ - public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException - { - checkNotClosed(); - AMQTopic dest = checkValidTopic(topic); - - // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal)); - } - - public TemporaryQueue createTemporaryQueue() throws JMSException - { - checkNotClosed(); - try - { - AMQTemporaryQueue result = new AMQTemporaryQueue(this); - - // this is done so that we can produce to a temporary queue before we create a consumer - result.setQueueName(result.getRoutingKey()); - createQueue(result.getAMQQueueName(), result.isAutoDelete(), - result.isDurable(), result.isExclusive()); - bindQueue(result.getAMQQueueName(), result.getRoutingKey(), - new FieldTable(), result.getExchangeName(), result); - return result; - } - catch (Exception e) - { - JMSException ex = new JMSException("Cannot create temporary queue"); - ex.setLinkedException(e); - e.printStackTrace(); - throw ex; - } - } - - public TemporaryTopic createTemporaryTopic() throws JMSException - { - checkNotClosed(); - - return new AMQTemporaryTopic(this); - } - - public TextMessage createTextMessage() throws JMSException - { - synchronized (getFailoverMutex()) - { - checkNotClosed(); - - return new JMSTextMessage(getMessageDelegateFactory()); - } - } - - protected Object getFailoverMutex() - { - return _connection.getFailoverMutex(); - } - - public TextMessage createTextMessage(String text) throws JMSException - { - - TextMessage msg = createTextMessage(); - msg.setText(text); - - return msg; - } - - public Topic createTopic(String topicName) throws JMSException - { - checkNotClosed(); - - if (topicName.indexOf('/') == -1) - { - return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); - } - else - { - try - { - return new AMQTopic(new AMQBindingURL(topicName)); - } - catch (URISyntaxException urlse) - { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - - throw jmse; - } - } - } - - public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException - { - declareExchange(name, type, getProtocolHandler(), nowait); - } - - public int getAcknowledgeMode() throws JMSException - { - checkNotClosed(); - - return _acknowledgeMode; - } - - public AMQConnection getAMQConnection() - { - return _connection; - } - - public int getChannelId() - { - return _channelId; - } - - public int getDefaultPrefetch() - { - return _defaultPrefetchHighMark; - } - - public int getDefaultPrefetchHigh() - { - return _defaultPrefetchHighMark; - } - - public int getDefaultPrefetchLow() - { - return _defaultPrefetchLowMark; - } - - public AMQShortString getDefaultQueueExchangeName() - { - return _connection.getDefaultQueueExchangeName(); - } - - public AMQShortString getDefaultTopicExchangeName() - { - return _connection.getDefaultTopicExchangeName(); - } - - public MessageListener getMessageListener() throws JMSException - { - // checkNotClosed(); - return _messageListener; - } - - public AMQShortString getTemporaryQueueExchangeName() - { - return _connection.getTemporaryQueueExchangeName(); - } - - public AMQShortString getTemporaryTopicExchangeName() - { - return _connection.getTemporaryTopicExchangeName(); - } - - public int getTicket() - { - return _ticket; - } - - public boolean getTransacted() throws JMSException - { - checkNotClosed(); - - return _transacted; - } - - public boolean hasConsumer(Destination destination) - { - AtomicInteger counter = _destinationConsumerCount.get(destination); - - return (counter != null) && (counter.get() != 0); - } - - public boolean isStrictAMQP() - { - return _strictAMQP; - } - - public boolean isSuspended() - { - return _suspended; - } - - protected void addUnacknowledgedMessage(long id) - { - _unacknowledgedMessageTags.add(id); - } - - protected void addDeliveredMessage(long id) - { - _deliveredMessageTags.add(id); - } - - /** - * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto - * the queue read by the dispatcher. - * - * @param message the message that has been received - */ - public void messageReceived(UnprocessedMessage message) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Message[" + message.toString() + "] received in session"); - } - _highestDeliveryTag.set(message.getDeliveryTag()); - _queue.add(message); - } - - public void declareAndBind(AMQDestination amqd) - throws - AMQException - { - AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler, false); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); - } - - /** - * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message. - * - * <p/>All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges - * all messages that have been delivered to the client. - * - * <p/>Restarting a session causes it to take the following actions: - * - * <ul> - * <li>Stop message delivery.</li> - * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". - * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. - * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> - * </ul> - * - * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and - * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible - * for the client to determine whether the broker is going to recover the session or not. - * - * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. - * Not that this does not necessarily mean that the recovery has failed, but simply that it is - * not possible to tell if it has or not. - * @todo Be aware of possible changes to parameter order as versions change. - */ - public void recover() throws JMSException - { - // Ensure that the session is open. - checkNotClosed(); - - // Ensure that the session is not transacted. - checkNotTransacted(); - - // this is set only here, and the before the consumer's onMessage is called it is set to false - _inRecovery = true; - try - { - - boolean isSuspended = isSuspended(); - - if (!isSuspended) - { - suspendChannel(true); - } - - if (_dispatcher != null) - { - _dispatcher.rollback(); - } - - sendRecover(); - - if (!isSuspended) - { - suspendChannel(false); - } - } - catch (AMQException e) - { - throw new JMSAMQException("Recover failed: " + e.getMessage(), e); - } - catch (FailoverException e) - { - throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e); - } - } - - protected abstract void sendRecover() throws AMQException, FailoverException; - - public void rejectMessage(UnprocessedMessage message, boolean requeue) - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag()); - } - - rejectMessage(message.getDeliveryTag(), requeue); - } - - public void rejectMessage(AbstractJMSMessage message, boolean requeue) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); - } - - rejectMessage(message.getDeliveryTag(), requeue); - - } - - public abstract void rejectMessage(long deliveryTag, boolean requeue); - - /** - * Commits all messages done in this transaction and releases any locks currently held. - * - * <p/>If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the - * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. - * The client will be unable to determine whether or not the rollback actually happened on the broker in this case. - * - * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does - * not mean that the rollback is known to have failed, merely that it is not known whether it - * failed or not. - * @todo Be aware of possible changes to parameter order as versions change. - */ - public void rollback() throws JMSException - { - synchronized (_suspensionLock) - { - checkTransacted(); - - try - { - boolean isSuspended = isSuspended(); - - if (!isSuspended) - { - suspendChannel(true); - } - - releaseForRollback(); - - sendRollback(); - - markClean(); - - if (!isSuspended) - { - suspendChannel(false); - } - } - catch (AMQException e) - { - throw new JMSAMQException("Failed to rollback: " + e, e); - } - catch (FailoverException e) - { - throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e); - } - } - } - - public abstract void releaseForRollback(); - - public abstract void sendRollback() throws AMQException, FailoverException; - - public void run() - { - throw new java.lang.UnsupportedOperationException(); - } - - public void setMessageListener(MessageListener listener) throws JMSException - { - // checkNotClosed(); - // - // if (_dispatcher != null && !_dispatcher.connectionStopped()) - // { - // throw new javax.njms.IllegalStateException("Attempt to set listener while session is started."); - // } - // - // // We are stopped - // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - // { - // BasicMessageConsumer consumer = i.next(); - // - // if (consumer.isReceiving()) - // { - // throw new javax.njms.IllegalStateException("Another thread is already receiving synchronously."); - // } - // } - // - // _messageListener = listener; - // - // for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - // { - // i.next().setMessageListener(_messageListener); - // } - - } - - /*public void setTicket(int ticket) - { - _ticket = ticket; - }*/ - - public void unsubscribe(String name) throws JMSException - { - checkNotClosed(); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); - if (subscriber != null) - { - // send a queue.delete for the subscription - deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); - _subscriptions.remove(name); - _reverseSubscriptionMap.remove(subscriber); - } - else - { - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." - + " Requesting queue deletion regardless."); - } - - deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); - } - else // Queue Browser - { - - if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) - { - deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); - } - else - { - throw new InvalidDestinationException("Unknown subscription exchange:" + name); - } - } - } - } - - protected C createConsumerImpl(final Destination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose) throws JMSException - { - checkTemporaryDestination(destination); - - final String messageSelector; - - if (_strictAMQP && !((selector == null) || selector.equals(""))) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); - } - else - { - messageSelector = null; - } - } - else - { - messageSelector = selector; - } - - return new FailoverRetrySupport<C, JMSException>( - new FailoverProtectedOperation<C, JMSException>() - { - public C execute() throws JMSException, FailoverException - { - checkNotClosed(); - - AMQDestination amqd = (AMQDestination) destination; - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - // TODO: Define selectors in AMQP - // TODO: construct the rawSelector from the selector string if rawSelector == null - final FieldTable ft = FieldTableFactory.newFieldTable(); - // if (rawSelector != null) - // ft.put("headers", rawSelector.getDataAsBytes()); - // rawSelector is used by HeadersExchange and is not a JMS Selector - if (rawSelector != null) - { - ft.addAll(rawSelector); - } - - if (messageSelector != null) - { - ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); - } - - C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, - noLocal, exclusive, messageSelector, ft, noConsume, autoClose); - - if (_messageListener != null) - { - consumer.setMessageListener(_messageListener); - } - - try - { - registerConsumer(consumer, false); - } - catch (AMQInvalidArgumentException ise) - { - JMSException ex = new InvalidSelectorException(ise.getMessage()); - ex.setLinkedException(ise); - throw ex; - } - catch (AMQInvalidRoutingKeyException e) - { - JMSException ide = - new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString()); - ide.setLinkedException(e); - throw ide; - } - catch (AMQException e) - { - JMSException ex = new JMSException("Error registering consumer: " + e); - - ex.setLinkedException(e); - throw ex; - } - - synchronized (destination) - { - _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); - _destinationConsumerCount.get(destination).incrementAndGet(); - } - - return consumer; - } - }, _connection).execute(); - } - - public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments, - final boolean noConsume, final boolean autoClose) throws JMSException; - - /** - * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer - * instance. - * - * @param consumer the consum - */ - void deregisterConsumer(C consumer) - { - if (_consumers.remove(consumer.getConsumerTag()) != null) - { - String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if (subscriptionName != null) - { - _subscriptions.remove(subscriptionName); - } - - Destination dest = consumer.getDestination(); - synchronized (dest) - { - if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) - { - _destinationConsumerCount.remove(dest); - } - } - - // Consumers that are closed in a transaction must be stored - // so that messages they have received can be acknowledged on commit - if (_transacted) - { - _removedConsumers.add(consumer); - } - } - } - - void deregisterProducer(long producerId) - { - _producers.remove(new Long(producerId)); - } - - boolean isInRecovery() - { - return _inRecovery; - } - - boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException - { - return isQueueBound(exchangeName, queueName, null); - } - - /** - * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param exchangeName The exchange name to test for binding against. - * @param queueName The queue name to check if bound. - * @param routingKey The routing key to check if the queue is bound under. - * - * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. - * - * @throws JMSException If the query fails for any reason. - * @todo Be aware of possible changes to parameter order as versions change. - */ - public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException; - - public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; - - /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover - * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. - */ - void markClosed() - { - _closed.set(true); - _connection.deregisterSession(_channelId); - markClosedProducersAndConsumers(); - - } - - /** - * Resubscribes all producers and consumers. This is called when performing failover. - * - * @throws AMQException - */ - void resubscribe() throws AMQException - { - if (_dirty) - { - _failedOverDirty = true; - } - - _rollbackMark.set(-1); - resubscribeProducers(); - resubscribeConsumers(); - } - - void setHasMessageListeners() - { - _hasMessageListeners = true; - } - - void setInRecovery(boolean inRecovery) - { - _inRecovery = inRecovery; - } - - /** - * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. - * - * @throws AMQException If the session cannot be started for any reason. - * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the - * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages - * for each subsequent call to flow.. only need to do this if we have called stop. - */ - void start() throws AMQException - { - // Check if the session has perviously been started and suspended, in which case it must be unsuspended. - if (_startedAtLeastOnce.getAndSet(true)) - { - suspendChannel(false); - } - - // If the event dispatcher is not running then start it too. - if (hasMessageListeners()) - { - startDispatcherIfNecessary(); - } - } - - void startDispatcherIfNecessary() - { - //If we are the dispatcher then we don't need to check we are started - if (Thread.currentThread() == _dispatcher) - { - return; - } - - // If IMMEDIATE_PREFETCH is not set then we need to start fetching - // This is final per session so will be multi-thread safe. - if (!_immediatePrefetch) - { - // We do this now if this is the first call on a started connection - if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false)) - { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - _logger.info("Unsuspending channel threw an exception:" + e); - } - } - } - - startDispatcherIfNecessary(false); - } - - synchronized void startDispatcherIfNecessary(boolean initiallyStopped) - { - if (_dispatcher == null) - { - _dispatcher = new Dispatcher(); - _dispatcher.setDaemon(true); - _dispatcher.setConnectionStopped(initiallyStopped); - _dispatcher.start(); - } - else - { - _dispatcher.setConnectionStopped(initiallyStopped); - } - } - - void stop() throws AMQException - { - // Stop the server delivering messages to this session. - suspendChannel(true); - - if (_dispatcher != null) - { - _dispatcher.setConnectionStopped(true); - } - } - - /* - * Binds the named queue, with the specified routing key, to the named exchange. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param queueName The name of the queue to bind. - * @param routingKey The routing key to bind the queue with. - * @param arguments Additional arguments. - * @param exchangeName The exchange to bind the queue on. - * - * @throws AMQException If the queue cannot be bound for any reason. - */ - /*private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) - throws AMQException, FailoverException - { - AMQFrame queueBind = - QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), ft, // arguments - amqd.getExchangeName(), // exchange - false, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - getTicket()); // ticket - - protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); - }*/ - - private void checkNotTransacted() throws JMSException - { - if (getTransacted()) - { - throw new IllegalStateException("Session is transacted"); - } - } - - private void checkTemporaryDestination(Destination destination) throws JMSException - { - if ((destination instanceof TemporaryDestination)) - { - _logger.debug("destination is temporary"); - final TemporaryDestination tempDest = (TemporaryDestination) destination; - if (tempDest.getSession() != this) - { - _logger.debug("destination is on different session"); - throw new JMSException("Cannot consume from a temporary destination created onanother session"); - } - - if (tempDest.isDeleted()) - { - _logger.debug("destination is deleted"); - throw new JMSException("Cannot consume from a deleted destination"); - } - } - } - - protected void checkTransacted() throws JMSException - { - if (!getTransacted()) - { - throw new IllegalStateException("Session is not transacted"); - } - } - - private void checkValidDestination(Destination destination) throws InvalidDestinationException - { - if (destination == null) - { - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } - } - - private void checkValidQueue(Queue queue) throws InvalidDestinationException - { - if (queue == null) - { - throw new javax.jms.InvalidDestinationException("Invalid Queue"); - } - } - - /* - * I could have combined the last 3 methods, but this way it improves readability - */ - protected AMQTopic checkValidTopic(Topic topic) throws JMSException - { - if (topic == null) - { - throw new javax.jms.InvalidDestinationException("Invalid Topic"); - } - - if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this)) - { - throw new javax.jms.InvalidDestinationException( - "Cannot create a subscription on a temporary topic created in another session"); - } - - if (!(topic instanceof AMQTopic)) - { - throw new javax.jms.InvalidDestinationException( - "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " - + topic.getClass().getName()); - } - - return (AMQTopic) topic; - } - - /** - * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. - * - * @param error not null if this is a result of an error occurring at the connection level - */ - private void closeConsumers(Throwable error) throws JMSException - { - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); - - final Iterator<C> it = clonedConsumers.iterator(); - while (it.hasNext()) - { - final C con = it.next(); - if (error != null) - { - con.notifyError(error); - } - else - { - con.close(false); - } - } - // at this point the _consumers map will be empty - if (_dispatcher != null) - { - _dispatcher.close(); - _dispatcher = null; - } - } - - /** - * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is - * currently no way of propagating errors to message producers (this is a JMS limitation). - */ - private void closeProducers() throws JMSException - { - // we need to clone the list of producers since the close() method updates the _producers collection - // which would result in a concurrent modification exception - final ArrayList clonedProducers = new ArrayList(_producers.values()); - - final Iterator it = clonedProducers.iterator(); - while (it.hasNext()) - { - final P prod = (P) it.next(); - prod.close(); - } - // at this point the _producers map is empty - } - - /** - * Close all producers or consumers. This is called either in the error case or when closing the session normally. - * - * @param amqe the exception, may be null to indicate no error has occurred - */ - private void closeProducersAndConsumers(AMQException amqe) throws JMSException - { - JMSException jmse = null; - try - { - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - jmse = e; - } - - try - { - closeConsumers(amqe); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - if (jmse == null) - { - jmse = e; - } - } - - if (jmse != null) - { - throw jmse; - } - } - - /** - * Register to consume from the queue. - * - * @param queueName - */ - private void consumeFromQueue(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException - { - int tagId = _nextTag++; - - consumer.setConsumerTag(tagId); - // we must register the consumer in the map before we actually start listening - _consumers.put(tagId, consumer); - - try - { - sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); - } - catch (AMQException e) - { - // clean-up the map in the event of an error - _consumers.remove(tagId); - throw e; - } - } - - public abstract void sendConsume(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException; - - private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate, false); - } - - private P createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) throws JMSException - { - return new FailoverRetrySupport<P, JMSException>( - new FailoverProtectedOperation<P, JMSException>() - { - public P execute() throws JMSException, FailoverException - { - checkNotClosed(); - long producerId = getNextProducerId(); - P producer = createMessageProducer(destination, mandatory, - immediate, waitUntilSent, producerId); - registerProducer(producerId, producer); - - return producer; - } - }, _connection).execute(); - } - - public abstract P createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent, long producerId); - - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException - { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); - } - - /** - * Returns the number of messages currently queued for the given destination. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param amqd The destination to be checked - * - * @return the number of queued messages. - * - * @throws AMQException If the queue cannot be declared for any reason. - */ - public long getQueueDepth(final AMQDestination amqd) - throws AMQException - { - return new FailoverNoopSupport<Long, AMQException>( - new FailoverProtectedOperation<Long, AMQException>() - { - public Long execute() throws AMQException, FailoverException - { - return requestQueueDepth(amqd); - } - }, _connection).execute(); - - } - - protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; - - /** - * Declares the named exchange and type of exchange. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param name The name of the exchange to declare. - * @param type The type of the exchange to declare. - * @param protocolHandler The protocol handler to process the communication through. - * @param nowait - * - * @throws AMQException If the exchange cannot be declared for any reason. - * @todo Be aware of possible changes to parameter order as versions change. - */ - private void declareExchange(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException - { - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() - { - public Object execute() throws AMQException, FailoverException - { - sendExchangeDeclare(name, type, protocolHandler, nowait); - return null; - } - }, _connection).execute(); - } - - public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; - - /** - * Declares a queue for a JMS destination. - * - * <p/>Note that for queues but not topics the name is generated in the client rather than the server. This allows - * the name to be reused on failover if required. In general, the destination indicates whether it wants a name - * generated or not. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param amqd The destination to declare as a queue. - * @param protocolHandler The protocol handler to communicate through. - * - * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of - * the client. - * - * @throws AMQException If the queue cannot be declared for any reason. - * @todo Verify the destiation is valid or throw an exception. - * @todo Be aware of possible changes to parameter order as versions change. - */ - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) - throws AMQException - { - /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ - return new FailoverNoopSupport<AMQShortString, AMQException>( - new FailoverProtectedOperation<AMQShortString, AMQException>() - { - public AMQShortString execute() throws AMQException, FailoverException - { - // Generate the queue name if the destination indicates that a client generated name is to be used. - if (amqd.isNameRequired()) - { - amqd.setQueueName(protocolHandler.generateQueueName()); - } - - sendQueueDeclare(amqd, protocolHandler); - - return amqd.getAMQQueueName(); - } - }, _connection).execute(); - } - - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException; - - /** - * Undeclares the specified queue. - * - * <p/>Note that this operation automatically retries in the event of fail-over. - * - * @param queueName The name of the queue to delete. - * - * @throws JMSException If the queue could not be deleted for any reason. - * @todo Be aware of possible changes to parameter order as versions change. - */ - protected void deleteQueue(final AMQShortString queueName) throws JMSException - { - try - { - new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() - { - public Object execute() throws AMQException, FailoverException - { - sendQueueDelete(queueName); - return null; - } - }, _connection).execute(); - } - catch (AMQException e) - { - throw new JMSAMQException("The queue deletion failed: " + e.getMessage(), e); - } - } - - public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; - - private long getNextProducerId() - { - return ++_nextProducerId; - } - - protected AMQProtocolHandler getProtocolHandler() - { - return _connection.getProtocolHandler(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolHandler().getProtocolMajorVersion(); - } - - public byte getProtocolMinorVersion() - { - return getProtocolHandler().getProtocolMinorVersion(); - } - - protected boolean hasMessageListeners() - { - return _hasMessageListeners; - } - - private void markClosedConsumers() throws JMSException - { - if (_dispatcher != null) - { - _dispatcher.close(); - _dispatcher = null; - } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); - - final Iterator<C> it = clonedConsumers.iterator(); - while (it.hasNext()) - { - final C con = it.next(); - con.markClosed(); - } - // at this point the _consumers map will be empty - } - - private void markClosedProducersAndConsumers() - { - try - { - // no need for a markClosed* method in this case since there is no protocol traffic closing a producer - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - - try - { - markClosedConsumers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - } - - /** - * Callers must hold the failover mutex before calling this method. - * - * @param consumer - * - * @throws AMQException - */ - private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException - { - AMQDestination amqd = consumer.getDestination(); - - AMQProtocolHandler protocolHandler = getProtocolHandler(); - - declareExchange(amqd, protocolHandler, false); - - AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal()); - - // store the consumer queue name - consumer.setQueuename(queueName); - - bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd); - - // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch - if (!_immediatePrefetch) - { - // The dispatcher will be null if we have just created this session - // so suspend the channel before we register our consumer so that we don't - // start prefetching until a receive/mListener is set. - if (_dispatcher == null) - { - if (!isSuspended()) - { - try - { - suspendChannel(true); - _logger.info( - "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); - } - catch (AMQException e) - { - _logger.info("Suspending channel threw an exception:" + e); - } - } - } - } - else - { - _logger.info("Immediately prefetching existing messages to new consumer."); - } - - try - { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); - } - catch (JMSException e) // thrown by getMessageSelector - { - throw new AMQException(null, e.getMessage(), e); - } - catch (FailoverException e) - { - throw new AMQException(null, "Fail-over exception interrupted basic consume.", e); - } - } - - private void registerProducer(long producerId, MessageProducer producer) - { - _producers.put(new Long(producerId), producer); - } - - private void rejectAllMessages(boolean requeue) - { - rejectMessagesForConsumerTag(0, requeue, true); - } - - /** - * @param consumerTag The consumerTag to prune from queue or all if null - * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) - * @param rejectAllConsumers - */ - - private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) - { - Iterator messages = _queue.iterator(); - if (_logger.isInfoEnabled()) - { - _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" - + requeue); - - if (messages.hasNext()) - { - _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); - } - else - { - _logger.info("No messages in _queue to reject"); - } - } - while (messages.hasNext()) - { - UnprocessedMessage message = (UnprocessedMessage) messages.next(); - - if (rejectAllConsumers || (message.getConsumerTag() == consumerTag)) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" - + message.getDeliveryTag()); - } - - messages.remove(); - - rejectMessage(message, requeue); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag); - } - } - } - } - - private void resubscribeConsumers() throws AMQException - { - ArrayList<C> consumers = new ArrayList<C>(_consumers.values()); - _consumers.clear(); - - for (C consumer : consumers) - { - consumer.failedOver(); - registerConsumer(consumer, true); - } - } - - private void resubscribeProducers() throws AMQException - { - ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey - for (Iterator it = producers.iterator(); it.hasNext();) - { - P producer = (P) it.next(); - producer.resubscribe(); - } - } - - /** - * Suspends or unsuspends this session. - * - * @param suspend <tt>true</tt> indicates that the session should be suspended, <tt>false<tt> indicates that it - * should be unsuspended. - * - * @throws AMQException If the session cannot be suspended for any reason. - * @todo Be aware of possible changes to parameter order as versions change. - */ - protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException - { - synchronized (_suspensionLock) - { - try - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); - } - - _suspended = suspend; - sendSuspendChannel(suspend); - } - catch (FailoverException e) - { - throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e); - } - } - } - - public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException; - - Object getMessageDeliveryLock() - { - return _messageDeliveryLock; - } - - /** - * Indicates whether this session consumers pre-fetche messages - * - * @return true if this session consumers pre-fetche messages false otherwise - */ - public boolean prefetch() - { - return getAMQConnection().getMaxPrefetch() > 0; - } - - /** Signifies that the session has pending sends to commit. */ - public void markDirty() - { - _dirty = true; - } - - /** Signifies that the session has no pending sends to commit. */ - public void markClean() - { - _dirty = false; - _failedOverDirty = false; - } - - /** - * Check to see if failover has occured since the last call to markClean(commit or rollback). - * - * @return boolean true if failover has occured. - */ - public boolean hasFailedOver() - { - return _failedOverDirty; - } - - /** - * Check to see if any message have been sent in this transaction and have not been commited. - * - * @return boolean true if a message has been sent but not commited - */ - public boolean isDirty() - { - return _dirty; - } - - public void setTicket(int ticket) - { - _ticket = ticket; - } - - public void setFlowControl(final boolean active) - { - _flowControl.setFlowControl(active); - } - - public void checkFlowControl() throws InterruptedException - { - synchronized (_flowControl) - { - while (!_flowControl.getFlowControl()) - { - _flowControl.wait(); - } - } - - } - - /** Used for debugging in the dispatcher. */ - private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); - - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - class Dispatcher extends Thread - { - - /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ - private final AtomicBoolean _closed = new AtomicBoolean(false); - - private final Object _lock = new Object(); - private String dispatcherID = "" + System.identityHashCode(this); - - - - public Dispatcher() - { - super("Dispatcher-Channel-" + _channelId); - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " created"); - } - } - - public void close() - { - _closed.set(true); - interrupt(); - - // fixme awaitTermination - - } - - public void rejectPending(C consumer) - { - synchronized (_lock) - { - boolean stopped = _dispatcher.connectionStopped(); - - if (!stopped) - { - _dispatcher.setConnectionStopped(true); - } - - // Reject messages on pre-receive queue - consumer.rollbackPendingMessages(); - - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - //Let the dispatcher deal with this when it gets to them. - - // closeConsumer - consumer.markClosed(); - - _dispatcher.setConnectionStopped(stopped); - - } - } - - public void rollback() - { - - synchronized (_lock) - { - boolean isStopped = connectionStopped(); - - if (!isStopped) - { - setConnectionStopped(true); - } - - _rollbackMark.set(_highestDeliveryTag.get()); - - _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); - - for (C consumer : _consumers.values()) - { - if (!consumer.isNoConsume()) - { - consumer.rollback(); - } - else - { - // should perhaps clear the _SQ here. - // consumer._synchronousQueue.clear(); - consumer.clearReceiveQueue(); - } - - } - - for (int i = 0; i < _removedConsumers.size(); i++) - { - // Sends acknowledgement to server - _removedConsumers.get(i).rollback(); - _removedConsumers.remove(i); - } - - setConnectionStopped(isStopped); - } - - } - - public void run() - { - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " started"); - } - - UnprocessedMessage message; - - // Allow disptacher to start stopped - synchronized (_lock) - { - while (!_closed.get() && connectionStopped()) - { - try - { - _lock.wait(); - } - catch (InterruptedException e) - { - // ignore - } - } - } - - try - { - while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null)) - { - long deliveryTag = message.getDeliveryTag(); - - synchronized (_lock) - { - - while (connectionStopped()) - { - _lock.wait(); - } - - if (!(message instanceof CloseConsumerMessage) - && tagLE(deliveryTag, _rollbackMark.get())) - { - rejectMessage(message, true); - } - else - { - synchronized (_messageDeliveryLock) - { - dispatchMessage(message); - } - } - } - - long current = _rollbackMark.get(); - if (updateRollbackMark(current, deliveryTag)) - { - _rollbackMark.compareAndSet(current, deliveryTag); - } - } - } - catch (InterruptedException e) - { - // ignore - } - - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); - } - } - - // only call while holding lock - final boolean connectionStopped() - { - return _connectionStopped; - } - - boolean setConnectionStopped(boolean connectionStopped) - { - boolean currently; - synchronized (_lock) - { - currently = _connectionStopped; - _connectionStopped = connectionStopped; - _lock.notify(); - - if (_dispatcherLogger.isDebugEnabled()) - { - _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") - + ": Currently " + (currently ? "Stopped" : "Started")); - } - } - - return currently; - } - - private void dispatchMessage(UnprocessedMessage message) - { - //This if block is not needed anymore as bounce messages are handled separately - //if (message.getDeliverBody() != null) - //{ - final C consumer = - _consumers.get(message.getConsumerTag()); - - if ((consumer == null) || consumer.isClosed()) - { - if (_dispatcherLogger.isInfoEnabled()) - { - if (consumer == null) - { - _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " - + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); - } - else - { - if (consumer.isNoConsume()) - { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " + " consumer(" - + message.getConsumerTag() + ") is closed and a browser so dropping..."); - //DROP MESSAGE - return; - - } - else - { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " + " consumer(" - + message.getConsumerTag() + ") is closed rejecting(requeue)..."); - } - } - } - // Don't reject if we're already closing - if (!_closed.get()) - { - rejectMessage(message, true); - } - } - else - { - consumer.notifyMessage(message); - } - - } - } - - protected abstract boolean tagLE(long tag1, long tag2); - - protected abstract boolean updateRollbackMark(long current, long deliveryTag); - - public abstract AMQMessageDelegateFactory getMessageDelegateFactory(); - - /*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, - boolean read) throws AMQException - { - getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), getProtocolMinorVersion(), active, exclusive, passive, read, realm, write), - new BlockingMethodFrameListener(_channelId) - { - - public boolean processMethod(int channelId, AMQMethodBody frame) // throws AMQException - { - if (frame instanceof AccessRequestOkBody) - { - setTicket(((AccessRequestOkBody) frame).getTicket()); - - return true; - } - else - { - return false; - } - } - }); - }*/ - - private class SuspenderRunner implements Runnable - { - private AtomicBoolean _suspend; - - public SuspenderRunner(AtomicBoolean suspend) - { - _suspend = suspend; - } - - public void run() - { - try - { - synchronized (_suspensionLock) - { - suspendChannel(_suspend.get()); - } - } - catch (AMQException e) - { - _logger.warn("Unable to suspend channel"); - } - } - } -} |