/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE;
import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE;
import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.JMSObjectMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
*
CRC Card
*
Responsibilities
Collaborations
*
*
*
* @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
* example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
* fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
* the fail-over process, the retry handler could be used to automatically retry the operation once the connection
* has been reestablished. All fail-over protected operations should be placed in private methods, with
* FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
* fail-over process sets a nowait flag and uses an async method call instead.
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
* after looking at worse bottlenecks first.
*/
public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
/** System property to enable strict AMQP compliance. */
public static final String STRICT_AMQP = "STRICT_AMQP";
/** Strict AMQP default setting. */
public static final String STRICT_AMQP_DEFAULT = "false";
/** System property to enable failure if strict AMQP compliance is violated. */
public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
/** Strickt AMQP failure default. */
public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
/** System property to enable immediate message prefetching. */
public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
/** Immediate message prefetch default. */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
*/
private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD,
DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD);
/**
* The period to wait while flow controlled before declaring a failure
*/
private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE,
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
private final boolean _delareQueues =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
private final boolean _declareExchanges =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
private final boolean _useAMQPEncodedMapMessage;
/**
* Flag indicating to start dispatcher as a daemon thread
*/
protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
/** The connection to which this session belongs. */
private AMQConnection _connection;
/** Used to indicate whether or not this is a transactional session. */
private final boolean _transacted;
/** Holds the sessions acknowledgement mode. */
private final int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
private int _channelId;
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
private int _prefetchHighMark;
/** Holds the low mark for prefetched messages, below which the session is resumed. */
private int _prefetchLowMark;
/** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
/** Used to indicate that this session has been started at least once. */
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
private final ConcurrentHashMap> _subscriptions =
new ConcurrentHashMap>();
private final ConcurrentHashMap _reverseSubscriptionMap = new ConcurrentHashMap();
private final Lock _subscriberDetails = new ReentrantLock(true);
private final Lock _subscriberAccess = new ReentrantLock(true);
private final FlowControllingBlockingQueue _queue;
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
private ConcurrentLinkedQueue _prefetchedMessageTags = new ConcurrentLinkedQueue();
private ConcurrentLinkedQueue _unacknowledgedMessageTags = new ConcurrentLinkedQueue();
private ConcurrentLinkedQueue _deliveredMessageTags = new ConcurrentLinkedQueue();
private volatile Dispatcher _dispatcher;
private volatile Thread _dispatcherThread;
private MessageFactoryRegistry _messageFactoryRegistry;
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
private Map _producers = new ConcurrentHashMap();
/**
* Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
* methods.
*/
private int _nextTag = 1;
private final IdToConsumerMap _consumers = new IdToConsumerMap();
/**
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
*/
private CopyOnWriteArrayList _removedConsumers = new CopyOnWriteArrayList();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap _destinationConsumerCount =
new ConcurrentHashMap();
/**
* Used as a source of unique identifiers for producers within the session.
*
* Access to this id does not require to be synchronized since according to the JMS specification only one
* thread of control is allowed to create producers for any given session instance.
*/
private long _nextProducerId;
/**
* Set when recover is called. This is to handle the case where recover() is called by application code during
* onMessage() processing to ensure that an auto ack is not sent.
*/
private volatile boolean _sessionInRecovery;
private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
/** Used to indicate that this session has a message listener attached to it. */
private boolean _hasMessageListeners;
/** Used to indicate that this session has been suspended. */
private boolean _suspended;
/**
* Used to protect the suspension of this session, so that critical code can be executed during suspension,
* without the session being resumed by other threads.
*/
private final Object _suspensionLock = new Object();
private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
private final boolean _immediatePrefetch;
private final boolean _strictAMQP;
private final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
private boolean _dirty;
/** Has failover occured on this session with outstanding actions to commit? */
private boolean _failedOverDirty;
/** Flow control */
private FlowControlIndicator _flowControl = new FlowControlIndicator();
/** Holds the highest received delivery tag. */
protected AtomicLong getHighestDeliveryTag()
{
return _highestDeliveryTag;
}
/** Pre-fetched message tags */
protected ConcurrentLinkedQueue getPrefetchedMessageTags()
{
return _prefetchedMessageTags;
}
/** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue getUnacknowledgedMessageTags()
{
return _unacknowledgedMessageTags;
}
/** All the delivered message tags */
protected ConcurrentLinkedQueue getDeliveredMessageTags()
{
return _deliveredMessageTags;
}
/** Holds the dispatcher thread for this session. */
protected Dispatcher getDispatcher()
{
return _dispatcher;
}
protected Thread getDispatcherThread()
{
return _dispatcherThread;
}
/** Holds the message factory factory for this session. */
protected MessageFactoryRegistry getMessageFactoryRegistry()
{
return _messageFactoryRegistry;
}
/**
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
protected IdToConsumerMap getConsumers()
{
return _consumers;
}
protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
{
_usingDispatcherForCleanup = usingDispatcherForCleanup;
}
/** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
protected boolean isImmediatePrefetch()
{
return _immediatePrefetch;
}
public static final class IdToConsumerMap
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
private final ConcurrentHashMap _slowAccessConsumers = new ConcurrentHashMap();
public C get(int id)
{
if ((id & 0xFFFFFFF0) == 0)
{
return (C) _fastAccessConsumers[id];
}
else
{
return _slowAccessConsumers.get(id);
}
}
public C put(int id, C consumer)
{
C oldVal;
if ((id & 0xFFFFFFF0) == 0)
{
oldVal = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = consumer;
}
else
{
oldVal = _slowAccessConsumers.put(id, consumer);
}
return oldVal;
}
public C remove(int id)
{
C consumer;
if ((id & 0xFFFFFFF0) == 0)
{
consumer = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = null;
}
else
{
consumer = _slowAccessConsumers.remove(id);
}
return consumer;
}
public Collection values()
{
ArrayList values = new ArrayList();
for (int i = 0; i < 16; i++)
{
if (_fastAccessConsumers[i] != null)
{
values.add((C) _fastAccessConsumers[i]);
}
}
values.addAll(_slowAccessConsumers.values());
return values;
}
public void clear()
{
_slowAccessConsumers.clear();
for (int i = 0; i < 16; i++)
{
_fastAccessConsumers[i] = null;
}
}
}
private static final class FlowControlIndicator
{
private volatile boolean _flowControl = true;
public synchronized void setFlowControl(boolean flowControl)
{
_flowControl = flowControl;
notify();
}
public boolean getFlowControl()
{
return _flowControl;
}
}
/**
* Creates a new session on a connection.
*
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
* @param acknowledgeMode The acknowledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
_useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat();
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
_immediatePrefetch =
_strictAMQP
|| Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
_connection = con;
_transacted = transacted;
if (transacted)
{
_acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
}
else
{
_acknowledgeMode = acknowledgeMode;
}
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
_prefetchHighMark = defaultPrefetchHighMark;
_prefetchLowMark = defaultPrefetchLowMark;
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
public void aboveThreshold(int currentValue)
{
// If the session has been closed don't waste time creating a thread to do
// flow control
if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
// Only execute change if previous state
// was False
if (!_suspendState.getAndSet(true))
{
if (_logger.isDebugEnabled())
{
_logger.debug(
"Above threshold(" + _prefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
}
try
{
Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
}
catch (Exception e)
{
throw new RuntimeException("Failed to create thread", e);
}
}
}
}
public void underThreshold(int currentValue)
{
// If the session has been closed don't waste time creating a thread to do
// flow control
if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
// Only execute change if previous state
// was true
if (_suspendState.getAndSet(false))
{
if (_logger.isDebugEnabled())
{
_logger.debug(
"Below threshold(" + _prefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
}
try
{
Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
}
catch (Exception e)
{
throw new RuntimeException("Failed to create thread", e);
}
}
}
}
});
}
else
{
_queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
// Add creation logging to tie in with the existing close logging
if (_logger.isDebugEnabled())
{
_logger.debug("Created session:" + this);
}
}
/**
* Creates a new session on a connection with the default message factory factory.
*
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
* @param acknowledgeMode The acknowledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
int defaultPrefetchLow)
{
this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
defaultPrefetchLow);
}
// ===== JMS Session methods.
/**
* Closes the session with no timeout.
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
*/
public void close() throws JMSException
{
close(-1);
}
public abstract AMQException getLastException();
public void checkNotClosed() throws JMSException
{
try
{
super.checkNotClosed();
}
catch (IllegalStateException ise)
{
AMQException ex = getLastException();
if (ex != null)
{
IllegalStateException ssnClosed = new IllegalStateException(
"Session has been closed", ex.getErrorCode().toString());
ssnClosed.setLinkedException(ex);
ssnClosed.initCause(ex);
throw ssnClosed;
}
else
{
throw ise;
}
}
}
public BytesMessage createBytesMessage() throws JMSException
{
checkNotClosed();
JMSBytesMessage msg = new JMSBytesMessage(getMessageDelegateFactory());
msg.setAMQSession(this);
return msg;
}
/**
* Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
*
* @throws IllegalStateException If the session is closed.
* @throws JMSException if there is a problem during acknowledge process.
*/
public void acknowledge() throws IllegalStateException, JMSException
{
if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
else if (hasFailedOverDirty())
{
//perform an implicit recover in this scenario
recover();
//notify the consumer
throw new IllegalStateException("has failed over");
}
try
{
acknowledgeImpl();
markClean();
}
catch (TransportException e)
{
throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
}
}
protected abstract void acknowledgeImpl() throws JMSException;
/**
* Acknowledge one or many messages.
*
* @param deliveryTag The tag of the last message to be acknowledged.
* @param multiple true to acknowledge all messages up to and including the one specified by the
* delivery tag, false to just acknowledge that message.
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
public MethodRegistry getMethodRegistry()
{
MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
return methodRegistry;
}
/**
* Binds the named queue, with the specified routing key, to the named exchange.
*
* Note that this operation automatically retries in the event of fail-over.
*
* @param queueName The name of the queue to bind.
* @param routingKey The routing key to bind the queue with.
* @param arguments Additional arguments.
* @param exchangeName The exchange to bind the queue on.
*
* @throws AMQException If the queue cannot be bound for any reason.
* @todo Be aware of possible changes to parameter order as versions change.
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
{
bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
}
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination destination,
final boolean nowait) throws AMQException
{
/*new FailoverRetrySupport