/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.io.Serializable;
import java.io.IOException;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.JMSObjectMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
CRC Card
*
Responsibilities
Collaborations
*
*
*
* @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
* example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
* fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
* the fail-over process, the retry handler could be used to automatically retry the operation once the connection
* has been reestablished. All fail-over protected operations should be placed in private methods, with
* FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
* fail-over process sets a nowait flag and uses an async method call instead.
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
* after looking at worse bottlenecks first.
*/
public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
public static final class IdToConsumerMap
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
private final ConcurrentHashMap _slowAccessConsumers = new ConcurrentHashMap();
public C get(int id)
{
if ((id & 0xFFFFFFF0) == 0)
{
return (C) _fastAccessConsumers[id];
}
else
{
return _slowAccessConsumers.get(id);
}
}
public C put(int id, C consumer)
{
C oldVal;
if ((id & 0xFFFFFFF0) == 0)
{
oldVal = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = consumer;
}
else
{
oldVal = _slowAccessConsumers.put(id, consumer);
}
return consumer;
}
public C remove(int id)
{
C consumer;
if ((id & 0xFFFFFFF0) == 0)
{
consumer = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = null;
}
else
{
consumer = _slowAccessConsumers.remove(id);
}
return consumer;
}
public Collection values()
{
ArrayList values = new ArrayList();
for (int i = 0; i < 16; i++)
{
if (_fastAccessConsumers[i] != null)
{
values.add((C) _fastAccessConsumers[i]);
}
}
values.addAll(_slowAccessConsumers.values());
return values;
}
public void clear()
{
_slowAccessConsumers.clear();
for (int i = 0; i < 16; i++)
{
_fastAccessConsumers[i] = null;
}
}
}
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
/**
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
protected static final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
protected static final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
/** System property to enable strict AMQP compliance. */
public static final String STRICT_AMQP = "STRICT_AMQP";
/** Strict AMQP default setting. */
public static final String STRICT_AMQP_DEFAULT = "false";
/** System property to enable failure if strict AMQP compliance is violated. */
public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
/** Strickt AMQP failure default. */
public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
/** System property to enable immediate message prefetching. */
public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
/** The connection to which this session belongs. */
protected AMQConnection _connection;
/** Used to indicate whether or not this is a transactional session. */
protected boolean _transacted;
/** Holds the sessions acknowledgement mode. */
protected final int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
protected int _channelId;
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
private int _defaultPrefetchHighMark;
/** Holds the low mark for prefetched messages, below which the session is resumed. */
private int _defaultPrefetchLowMark;
/** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
/** Used to indicate that this session has been started at least once. */
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
/**
* Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only
* keeps a record of subscriptions which have been created in the current instance. It does not remember
* subscriptions between executions of the client.
*/
protected final ConcurrentHashMap _subscriptions =
new ConcurrentHashMap();
/**
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
protected final ConcurrentHashMap _reverseSubscriptionMap =
new ConcurrentHashMap();
/**
* Used to hold incoming messages.
*
* @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
*/
protected final FlowControllingBlockingQueue _queue;
/** Holds the highest received delivery tag. */
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue();
/** All the delivered message tags */
protected ConcurrentLinkedQueue _deliveredMessageTags = new ConcurrentLinkedQueue();
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
protected Thread _dispatcherThread;
/** Holds the message factory factory for this session. */
protected MessageFactoryRegistry _messageFactoryRegistry;
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
private Map _producers = new ConcurrentHashMap();
/**
* Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
* methods.
*/
private int _nextTag = 1;
/**
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
protected final IdToConsumerMap _consumers = new IdToConsumerMap();
//Map _consumers =
//new ConcurrentHashMap();
/**
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
*/
private CopyOnWriteArrayList _removedConsumers = new CopyOnWriteArrayList();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap _destinationConsumerCount =
new ConcurrentHashMap();
/**
* Used as a source of unique identifiers for producers within the session.
*
* Access to this id does not require to be synchronized since according to the JMS specification only one
* thread of control is allowed to create producers for any given session instance.
*/
private long _nextProducerId;
/**
* Set when recover is called. This is to handle the case where recover() is called by application code during
* onMessage() processing to enure that an auto ack is not sent.
*/
private boolean _inRecovery;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
/** Used to indicate that this session has a message listener attached to it. */
private boolean _hasMessageListeners;
/** Used to indicate that this session has been suspended. */
private boolean _suspended;
/**
* Used to protect the suspension of this session, so that critical code can be executed during suspension,
* without the session being resumed by other threads.
*/
private final Object _suspensionLock = new Object();
/**
* Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel.
*
* @todo This is accessed only within a synchronized method, so does not need to be atomic.
*/
protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
/** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
protected final boolean _immediatePrefetch;
/** Indicates that warnings should be generated on violations of the strict AMQP. */
protected final boolean _strictAMQP;
/** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
protected final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
private boolean _dirty;
/** Has failover occured on this session with outstanding actions to commit? */
private boolean _failedOverDirty;
private static final class FlowControlIndicator
{
private volatile boolean _flowControl = true;
public synchronized void setFlowControl(boolean flowControl)
{
_flowControl = flowControl;
notify();
}
public boolean getFlowControl()
{
return _flowControl;
}
}
/** Flow control */
private FlowControlIndicator _flowControl = new FlowControlIndicator();
/**
* Creates a new session on a connection.
*
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
* @param acknowledgeMode The acknoledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
_immediatePrefetch =
_strictAMQP
|| Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
_connection = con;
_transacted = transacted;
if (transacted)
{
_acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
}
else
{
_acknowledgeMode = acknowledgeMode;
}
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
public void aboveThreshold(int currentValue)
{
_logger.debug(
"Above threshold(" + _defaultPrefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
_suspendState.set(true);
new Thread(new SuspenderRunner(_suspendState)).start();
}
public void underThreshold(int currentValue)
{
_logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
_suspendState.set(false);
new Thread(new SuspenderRunner(_suspendState)).start();
}
});
}
else
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
}
}
/**
* Creates a new session on a connection with the default message factory factory.
*
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
* @param acknowledgeMode The acknoledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
int defaultPrefetchLow)
{
this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
defaultPrefetchLow);
}
// ===== JMS Session methods.
/**
* Closes the session with no timeout.
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
*/
public void close() throws JMSException
{
close(-1);
}
public void checkNotClosed() throws JMSException
{
try
{
super.checkNotClosed();
}
catch (IllegalStateException ise)
{
// if the Connection has closed then we should throw any exception that has occured that we were not waiting for
AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
{
ise.setLinkedException(manager.getLastException());
}
throw ise;
}
}
public BytesMessage createBytesMessage() throws JMSException
{
checkNotClosed();
return new JMSBytesMessage(getMessageDelegateFactory());
}
/**
* Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
*
* @throws IllegalStateException If the session is closed.
*/
public void acknowledge() throws IllegalStateException
{
if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
else if (hasFailedOver())
{
throw new IllegalStateException("has failed over");
}
while (true)
{
Long tag = _unacknowledgedMessageTags.poll();
if (tag == null)
{
break;
}
acknowledgeMessage(tag, false);
}
}
/**
* Acknowledge one or many messages.
*
* @param deliveryTag The tag of the last message to be acknowledged.
* @param multiple true to acknowledge all messages up to and including the one specified by the
* delivery tag, false to just acknowledge that message.
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
public MethodRegistry getMethodRegistry()
{
MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
return methodRegistry;
}
/**
* Binds the named queue, with the specified routing key, to the named exchange.
*
* Note that this operation automatically retries in the event of fail-over.
*
* @param queueName The name of the queue to bind.
* @param routingKey The routing key to bind the queue with.
* @param arguments Additional arguments.
* @param exchangeName The exchange to bind the queue on.
*
* @throws AMQException If the queue cannot be bound for any reason.
* @todo Be aware of possible changes to parameter order as versions change.
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
{
bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
}
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination destination,
final boolean nowait) throws AMQException
{
/*new FailoverRetrySupport