/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.client; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.ServerSessionPool; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.ConnectionRedirectException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT, ClientProperties.DEFAULT_CLOSE_TIMEOUT); private final long _connectionNumber; /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be * held by any child objects of this connection such as the session, producers and consumers. */ private final Object _failoverMutex = new Object(); private final Object _sessionCreationLock = new Object(); /** * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session * and we must prevent the client from opening too many. */ private long _maximumChannelCount; /** The maximum size of frame supported by the server */ private long _maximumFrameSize; /** * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate * handler. */ private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); private final String _clientName; /** The user name to use for authentication */ private String _username; /** The password to use for authentication */ private String _password; /** The virtual path to connect to on the AMQ server */ private String _virtualHost; /** The exception listener for this connection object. */ private volatile ExceptionListener _exceptionListener; private ConnectionListener _connectionListener; private final ConnectionURL _connectionURL; /** * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message * publication. */ private volatile boolean _started; /** Policy dictating how to failover */ private FailoverPolicy _failoverPolicy; /* * _Connected should be refactored with a suitable wait object. */ private boolean _connected; /* * The connection meta data */ private QpidConnectionMetaData _connectionMetaData; private AMQShortString _defaultTopicExchangeName = AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME); private AMQShortString _defaultQueueExchangeName = AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME); private AMQShortString _temporaryTopicExchangeName = AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME); private AMQShortString _temporaryQueueExchangeName = AMQShortString.valueOf(ExchangeDefaults.DIRECT_EXCHANGE_NAME); /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages private int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; //Indicates whether we need to sync on every message ack private boolean _syncAck; //Indicates the sync publish options (persistent|all) //By default it's async publish private String _syncPublish = ""; // Indicates whether to use the old map message format or the // new amqp-0-10 encoded format. private boolean _useLegacyMapMessageFormat; // Indicates whether to use the old stream message format or the // new amqp-0-10 list encoded format. private boolean _useLegacyStreamMessageFormat; // When sending to a Queue destination for the first time, check that the queue is bound private final boolean _validateQueueOnSend; //used to track the last failover time for //Address resolution purposes private volatile long _lastFailoverTime = 0; private boolean _compressMessages; private int _messageCompressionThresholdSize; static { if (_logger.isDebugEnabled()) { _logger.debug("Qpid version : " + QpidProperties.getVersionString()); } // The registering of any additional SASL mechanisms with the Java Security API requires // SecurityManager permissions. In execution environments such as web containers, // this may require adjustments to the Java security.policy. CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance(); if (_logger.isDebugEnabled()) { _logger.debug("Loaded mechanisms " + registry.getMechanisms()); } } /** * @param broker brokerdetails * @param username username * @param password password * @param clientName clientid * @param virtualHost virtualhost * * @throws AMQException * @throws URLSyntaxException */ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'")); } public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'")); } public AMQConnection(String connection) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(connection)); } /** * TODO Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. */ public AMQConnection(ConnectionURL connectionURL) throws AMQException { if (connectionURL == null) { throw new IllegalArgumentException("Connection must be specified"); } _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet(); if (_logger.isDebugEnabled()) { _logger.debug("Connection(" + _connectionNumber + "):" + connectionURL); } // set this connection maxPrefetch if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); } else { // use the default value set for all connections _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, ClientProperties.MAX_PREFETCH_DEFAULT)); } if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); _logger.warn("sync_persistence is a deprecated property, " + "please use sync_publish={persistent|all} instead"); } else { // use the default value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); if (_syncPersistence) { _logger.warn("sync_persistence is a deprecated property, " + "please use sync_publish={persistent|all} instead"); } } if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) { _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); } else { // use the default value set for all connections _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); } if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) { _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); } else { // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish); } if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null) { _useLegacyMapMessageFormat = Boolean.parseBoolean( connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT)); } else { // use the default value set for all connections _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); } if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT) != null) { _useLegacyStreamMessageFormat = Boolean.parseBoolean( connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT)); } else { // use the default value set for all connections _useLegacyStreamMessageFormat = System.getProperty(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT) == null ? true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT); } if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null) { _validateQueueOnSend = Boolean.parseBoolean( connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)); } else { _validateQueueOnSend = Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); } if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null) { _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES)); } else { _compressMessages = Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES, String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES))); } if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null) { _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE)); } else { _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE, ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE); } if(_messageCompressionThresholdSize <= 0) { _messageCompressionThresholdSize = Integer.MAX_VALUE; } String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) { _logger.debug("AMQP version " + amqpVersion); } _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); if ("0-8".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); } else if ("0-9".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_0_9(this); } else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_9_1(this); } else { _delegate = new AMQConnectionDelegate_0_10(this); } _connectionURL = connectionURL; _clientName = connectionURL.getClientName(); _username = connectionURL.getUsername(); _password = connectionURL.getPassword(); setVirtualHost(connectionURL.getVirtualHost()); if (connectionURL.getDefaultQueueExchangeName() != null) { _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); } if (connectionURL.getDefaultTopicExchangeName() != null) { _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName(); } if (connectionURL.getTemporaryQueueExchangeName() != null) { _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName(); } if (connectionURL.getTemporaryTopicExchangeName() != null) { _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); } _protocolHandler = new AMQProtocolHandler(this); if (_logger.isDebugEnabled()) { _logger.debug("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); } // We are not currently connected setConnected(false); boolean retryAllowed = true; Exception connectionException = null; while (!isConnected() && retryAllowed && brokerDetails != null) { ProtocolVersion pe = null; try { pe = makeBrokerConnection(brokerDetails); } catch (Exception e) { if (_logger.isInfoEnabled()) { _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); } connectionException = e; } if (pe != null) { // reset the delegate to the version returned by the // broker initDelegate(pe); } else if (!isConnected()) { if(connectionException instanceof ConnectionRedirectException) { ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException; retryAllowed = true; brokerDetails = new AMQBrokerDetails(brokerDetails); brokerDetails.setHost(redirect.getHost()); brokerDetails.setPort(redirect.getPort()); _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); } else { retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); } } } verifyClientID(); if (_logger.isDebugEnabled()) { _logger.debug("Are we connected:" + isConnected()); } if (!isConnected()) { if (_logger.isDebugEnabled()) { _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); } String message = null; if (connectionException != null) { if (connectionException.getCause() != null) { message = connectionException.getCause().getMessage(); } else { message = connectionException.getMessage(); } } if (message == null) { message = "Unable to Connect"; } else if("".equals(message)) { message = "Unable to Connect:" + connectionException.getClass(); } for (Throwable th = connectionException; th != null; th = th.getCause()) { if (th instanceof UnresolvedAddressException || th instanceof UnknownHostException) { throw new AMQUnresolvedAddressException (message, _failoverPolicy.getCurrentBrokerDetails().toString(), connectionException); } } throw new AMQConnectionFailureException(message, connectionException); } if (_logger.isDebugEnabled()) { _logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); } _sessions.setMaxChannelID(_delegate.getMaxChannelID()); _sessions.setMinChannelID(_delegate.getMinChannelID()); _connectionMetaData = new QpidConnectionMetaData(this); } protected boolean checkException(Throwable thrown) { Throwable cause = thrown.getCause(); if (cause == null) { cause = thrown; } return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } private void initDelegate(ProtocolVersion pe) throws AMQProtocolException { try { String delegateClassName = String.format ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", pe.getMajorVersion(), pe.getMinorVersion()); if (_logger.isDebugEnabled()) { _logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); } Class c = Class.forName(delegateClassName); Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); //Update our session to use this new protocol version _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); } catch (ClassNotFoundException e) { throw new AMQProtocolException (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, String.format("Protocol: %s.%s is rquired by the broker but is not " + "currently supported by this client library implementation", pe.getMajorVersion(), pe.getMinorVersion()), e); } catch (NoSuchMethodException e) { throw new RuntimeException("unable to locate constructor for delegate", e); } catch (InstantiationException e) { throw new RuntimeException("error instantiating delegate", e); } catch (IllegalAccessException e) { throw new RuntimeException("error accessing delegate", e); } catch (InvocationTargetException e) { throw new RuntimeException("error invoking delegate", e); } } private void setVirtualHost(String virtualHost) { if (virtualHost != null && virtualHost.startsWith("/")) { virtualHost = virtualHost.substring(1); } _virtualHost = virtualHost; } public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure) { BrokerDetails bd = new AMQBrokerDetails(_failoverPolicy.getCurrentBrokerDetails()); bd.setHost(host); bd.setPort(port); _failoverPolicy.setBroker(bd); try { makeBrokerConnection(bd); return true; } catch (Exception e) { if (_logger.isInfoEnabled()) { _logger.info("Unable to connect to broker at " + bd); } return useFailoverConfigOnFailure && attemptReconnection(); } } public boolean attemptReconnection() { BrokerDetails broker = null; while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { if (attemptConnection(broker)) { return true; } } // connection unsuccessful return false; } private boolean attemptConnection(final BrokerDetails broker) { try { makeBrokerConnection(broker); return true; } catch (Exception e) { if (!(e instanceof AMQException)) { if (_logger.isInfoEnabled()) { _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); } } else { if (_logger.isInfoEnabled()) { _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); } } } return false; } public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { resetClosedFlag(); return _delegate.makeBrokerConnection(brokerDetail); } public T executeRetrySupport(FailoverProtectedOperation operation) throws E { return _delegate.executeRetrySupport(operation); } /** * Get the details of the currently active broker * * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance * otherwise */ public BrokerDetails getActiveBrokerDetails() { return _failoverPolicy.getCurrentBrokerDetails(); } public boolean failoverAllowed() { if (!isConnected()) { return false; } else { return _failoverPolicy.failoverAllowed(); } } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { return createSession(transacted, acknowledgeMode, _maxPrefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) throws JMSException { return createSession(transacted, acknowledgeMode, prefetch, prefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { synchronized (_sessionCreationLock) { checkNotClosed(); return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow); } } public void setFailoverPolicy(FailoverPolicy policy) { _failoverPolicy = policy; } public FailoverPolicy getFailoverPolicy() { return _failoverPolicy; } /** * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in * the JMS spec * * @param transacted * @param acknowledgeMode * * @return QueueSession * * @throws JMSException */ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode)); } /** * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in * the JMS spec * * @param transacted * @param acknowledgeMode * * @return TopicSession * * @throws JMSException */ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode)); } public boolean channelLimitReached() { return _sessions.size() >= _maximumChannelCount; } public String getClientID() throws JMSException { checkNotClosed(); return _clientName; } public void setClientID(String clientID) throws JMSException { checkNotClosed(); // in AMQP it is not possible to change the client ID. If one is not specified // upon connection construction, an id is generated automatically. Therefore // we can always throw an exception. if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME)) { throw new IllegalStateException("Client name cannot be changed after being set"); } else { _logger.info("Operation setClientID is ignored using ID: " + getClientID()); } } public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); return _connectionMetaData; } protected final ExceptionListener getExceptionListenerNoCheck() { return _exceptionListener; } public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); return getExceptionListenerNoCheck(); } public void setExceptionListener(ExceptionListener listener) throws JMSException { checkNotClosed(); _exceptionListener = listener; } /** * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread * and is not thread safe (which is legal according to the JMS specification). * * @throws JMSException */ public void start() throws JMSException { checkNotClosed(); if (!_started) { _started = true; final Iterator it = _sessions.values().iterator(); while (it.hasNext()) { final AMQSession s = (AMQSession) (it.next()); try { s.start(); } catch (AMQException e) { throw new JMSAMQException(e); } } } } public void stop() throws JMSException { checkNotClosed(); if (_started) { for (Iterator i = _sessions.values().iterator(); i.hasNext();) { try { ((AMQSession) i.next()).stop(); } catch (AMQException e) { throw new JMSAMQException(e); } } _started = false; } } public void close() throws JMSException { close(DEFAULT_CLOSE_TIMEOUT); } public void close(long timeout) throws JMSException { boolean closed; synchronized (_sessionCreationLock) { closed = setClosed(); } if (!closed) { List sessions = new ArrayList<>(_sessions.values()); setClosing(true); try { doClose(sessions, timeout); } finally { setClosing(false); } } } private void doClose(List sessions, long timeout) throws JMSException { if (!sessions.isEmpty()) { AMQSession session = sessions.remove(0); synchronized (session.getMessageDeliveryLock()) { doClose(sessions, timeout); } } else { synchronized (getFailoverMutex()) { try { try { closeAllSessions(null, timeout); } finally { //This MUST occur after we have successfully closed all Channels/Sessions shutdownTaskPool(timeout); } } catch (JMSException e) { _logger.error("Error closing connection", e); JMSException jmse = new JMSException("Error closing connection: " + e); jmse.setLinkedException(e); jmse.initCause(e); throw jmse; } finally { try { _delegate.closeConnection(timeout); } catch (Exception e) { _logger.warn("Error closing underlying protocol connection", e); } } } } } private void shutdownTaskPool(final long timeout) { _taskPool.shutdown(); if (!_taskPool.isTerminated()) { try { _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { _logger.info("Interrupted while shutting down connection thread pool."); } } //If the taskpool hasn't shutdown by now then give it shutdownNow. // This will interrupt any running tasks. if (!_taskPool.isTerminated()) { List tasks = _taskPool.shutdownNow(); for (Runnable r : tasks) { _logger.warn("Connection close forced taskpool to prevent execution:" + r); } } } /** * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to * mark objects "visible" in userland as closed after failover or other significant event that impacts the * connection.

The caller must hold the failover mutex before calling this method. */ private void markAllSessionsClosed() { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); while (it.hasNext()) { final AMQSession session = (AMQSession) it.next(); session.markClosed(); } _sessions.clear(); } /** * Close all the sessions, either due to normal connection closure or due to an error occurring. * * @param cause if not null, the error that is causing this shutdown

The caller must hold the failover mutex * before calling this method. */ private void closeAllSessions(Throwable cause, long timeout) throws JMSException { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); JMSException sessionException = null; while (it.hasNext()) { final AMQSession session = (AMQSession) it.next(); if (cause != null) { session.closed(cause); } else { try { session.close(timeout); } catch (JMSException e) { _logger.error("Error closing session: " + e); sessionException = e; } } } _sessions.clear(); if (sessionException != null) { throw sessionException; } } public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); throw new JmsNotImplementedException(); } public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); throw new JmsNotImplementedException(); } public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); throw new JmsNotImplementedException(); } public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); throw new JmsNotImplementedException(); } public long getMaximumChannelCount() throws JMSException { checkNotClosed(); return _maximumChannelCount; } public void setConnectionListener(ConnectionListener listener) { _connectionListener = listener; } public ConnectionListener getConnectionListener() { return _connectionListener; } public void setMaximumChannelCount(long maximumChannelCount) { _maximumChannelCount = maximumChannelCount; } public void setMaximumFrameSize(long frameMax) { _maximumFrameSize = frameMax; } public long getMaximumFrameSize() { return _maximumFrameSize; } public ChannelToSessionMap getSessions() { return _sessions; } public String getUsername() { return _username; } public void setUsername(String id) { _username = id; } public String getPassword() { return _password; } public String getVirtualHost() { return _virtualHost; } public final AMQProtocolHandler getProtocolHandler() { return _protocolHandler; } public final boolean started() { return _started; } public final boolean isConnected() { return _connected; } protected final void setConnected(boolean connected) { _connected = connected; } public void bytesSent(long writtenBytes) { if (_connectionListener != null) { _connectionListener.bytesSent(writtenBytes); } } public void bytesReceived(long receivedBytes) { if (_connectionListener != null) { _connectionListener.bytesReceived(receivedBytes); } } /** * Fire the preFailover event to the registered connection listener (if any) * * @param redirect true if this is the result of a redirect request rather than a connection error * * @return true if no listener or listener does not veto change */ public boolean firePreFailover(boolean redirect) { _lastFailoverTime = System.currentTimeMillis(); boolean proceed = true; if (_connectionListener != null) { proceed = _connectionListener.preFailover(redirect); } return proceed; } /** * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes * resubscription then all the sessions are closed. * * @return true if no listener or listener does not veto resubscription. * * @throws JMSException */ public boolean firePreResubscribe() throws JMSException { if (_connectionListener != null) { boolean resubscribe = _connectionListener.preResubscribe(); if (!resubscribe) { markAllSessionsClosed(); } return resubscribe; } else { return true; } } /** Fires a failover complete event to the registered connection listener (if any). */ public void fireFailoverComplete() { if (_connectionListener != null) { _connectionListener.failoverComplete(); } } /** * In order to protect the consistency of the connection and its child sessions, consumers and producers, the * "failover mutex" must be held when doing any operations that could be corrupted during failover. * * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. */ public final Object getFailoverMutex() { return _failoverMutex; } public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); } /** * If failover is taking place this will block until it has completed. If failover is not taking place it will * return immediately. * * @throws InterruptedException */ public void blockUntilNotFailingOver() throws InterruptedException { _protocolHandler.blockUntilNotFailingOver(); } /** * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them. * * @param cause the exception */ public void exceptionReceived(Throwable cause) { if (_logger.isDebugEnabled()) { _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause); } final JMSException je; if (cause instanceof JMSException) { je = (JMSException) cause; } else { AMQConstant code = null; if (cause instanceof AMQException) { code = ((AMQException) cause).getErrorCode(); } if (code != null) { je = new JMSException("Exception thrown against " + toString() + ": " + cause, Integer.toString(code.getCode())); } else { //Should never get here as all AMQEs are required to have an ErrorCode! // Other than AMQDisconnectedEx! if (cause instanceof AMQDisconnectedException) { Exception last = _protocolHandler.getStateManager().getLastException(); if (last != null) { _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception"); cause = last; } } je = new JMSException("Exception thrown against " + toString() + ": " + cause); } if (cause instanceof Exception) { je.setLinkedException((Exception) cause); } je.initCause(cause); } boolean closer = false; // in the case of an IOException, MINA has closed the protocol session so we set _closed to true // so that any generic client code that tries to close the connection will not mess up this error // handling sequence if (cause instanceof IOException || cause instanceof AMQDisconnectedException) { // If we have an IOE/AMQDisconnect there is no connection to close on. setClosing(false); closer = !setClosed(); _protocolHandler.getProtocolSession().notifyError(je); } try { // decide if we are going to close the session if (hardError(cause)) { closer = (!setClosed()) || closer; { _logger.info("Closing AMQConnection due to :" + cause); } } else { _logger.info("Not a hard-error connection not closing: " + cause); } // if we are closing the connection, close sessions first if (closer) { // get the failover mutex before trying to close synchronized (getFailoverMutex()) { try { closeAllSessions(cause, -1); } catch (JMSException e) { _logger.error("Error closing all sessions: " + e, e); } } } } finally { deliverJMSExceptionToExceptionListenerOrLog(je, cause); } } private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause) { final ExceptionListener exceptionListener = getExceptionListenerNoCheck(); if (exceptionListener != null) { performConnectionTask(new Runnable() { @Override public void run() { // deliver the exception if there is a listener try { exceptionListener.onException(je); } catch (RuntimeException e) { _logger.error("Exception occurred in ExceptionListener", e); } } }); } else { _logger.error("Throwable Received but no listener set: " + cause); } } private boolean hardError(Throwable cause) { if (cause instanceof AMQException) { return ((AMQException) cause).isHardError(); } return true; } void registerSession(int channelId, AMQSession session) { _sessions.put(channelId, session); } public void deregisterSession(int channelId) { _sessions.remove(channelId); } public String toString() { StringBuffer buf = new StringBuffer("AMQConnection:\n"); if (_failoverPolicy.getCurrentBrokerDetails() == null) { buf.append("No active broker connection"); } else { BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails(); buf.append("Host: ").append(String.valueOf(bd.getHost())); buf.append("\nPort: ").append(String.valueOf(bd.getPort())); } buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); buf.append("\nClient ID: ").append(String.valueOf(_clientName)); buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size()); return buf.toString(); } /** * Returns connection url. * @return connection url */ public ConnectionURL getConnectionURL() { return _connectionURL; } /** * Returns stringified connection url. This url is suitable only for display * as {@link AMQConnectionURL#toString()} converts any password to asterisks. * @return connection url */ public String toURL() { return _connectionURL.toString(); } public Reference getReference() throws NamingException { return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()), AMQConnectionFactory.class.getName(), null); // factory location } public AMQShortString getDefaultTopicExchangeName() { return _defaultTopicExchangeName; } public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) { _defaultTopicExchangeName = defaultTopicExchangeName; } public AMQShortString getDefaultQueueExchangeName() { return _defaultQueueExchangeName; } public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) { _defaultQueueExchangeName = defaultQueueExchangeName; } public AMQShortString getTemporaryTopicExchangeName() { return _temporaryTopicExchangeName; } public AMQShortString getTemporaryQueueExchangeName() { return _temporaryQueueExchangeName; } public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) { _temporaryTopicExchangeName = temporaryTopicExchangeName; } public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName) { _temporaryQueueExchangeName = temporaryQueueExchangeName; } public void performConnectionTask(Runnable task) { try { _taskPool.execute(task); } catch (RejectedExecutionException e) { if(!(isClosed() || isClosing())) { throw e; } } } public AMQSession getSession(int channelId) { return _sessions.get(channelId); } public ProtocolVersion getProtocolVersion() { return _delegate.getProtocolVersion(); } public String getBrokerUUID() { if(getProtocolVersion().equals(ProtocolVersion.v0_10)) { return ((AMQConnectionDelegate_0_10)_delegate).getUUID(); } else { return null; } } /** * Tests whether the Broker has advertised support for the named feature. * * @param featureName * * @return true if the feature is supported, or false otherwise. */ boolean isSupportedServerFeature(final String featureName) { return _delegate.isSupportedServerFeature(featureName); } public boolean isFailingOver() { return (_protocolHandler.getFailoverLatch() != null); } /** * Get the maximum number of messages that this connection can pre-fetch. * * @return The maximum number of messages that this connection can pre-fetch. */ public long getMaxPrefetch() { return _maxPrefetch; } /** * Indicates whether persistent messages are synchronized * * @return true if persistent messages are synchronized false otherwise */ public boolean getSyncPersistence() { return _syncPersistence; } /** * Indicates whether we need to sync on every message ack */ public boolean getSyncAck() { return _syncAck; } public String getSyncPublish() { return _syncPublish; } public boolean isMessageCompressionDesired() { return _compressMessages; } public int getNextChannelID() { return _sessions.getNextChannelId(); } public boolean isUseLegacyMapMessageFormat() { return _useLegacyMapMessageFormat; } public boolean isUseLegacyStreamMessageFormat() { return _useLegacyStreamMessageFormat; } private void verifyClientID() throws AMQException { if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID)) { try { if (!_delegate.verifyClientID()) { throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique"); } } catch(JMSException e) { throw new AMQException(e.getMessage(),e); } } } public long getLastFailoverTime() { return _lastFailoverTime; } protected AMQConnectionDelegate getDelegate() { return _delegate; } public Long getConnectionNumber() { return _connectionNumber; } protected void logConnected(SocketAddress localAddress, SocketAddress remoteAddress) { if(_logger.isInfoEnabled()) { _logger.info("Connection " + _connectionNumber + " now connected from " + localAddress + " to " + remoteAddress); } } void setHeartbeatListener(HeartbeatListener listener) { _delegate.setHeartbeatListener(listener); } public boolean validateQueueOnSend() { return _validateQueueOnSend; } @Override protected boolean setClosed() { return super.setClosed(); } public int getMessageCompressionThresholdSize() { return _messageCompressionThresholdSize; } void doWithAllLocks(Runnable r) { doWithAllLocks(r, _sessions.values()); } private void doWithAllLocks(final Runnable r, final List sessions) { if (!sessions.isEmpty()) { AMQSession session = sessions.remove(0); final Object dispatcherLock = session.getDispatcherLock(); if (dispatcherLock != null) { synchronized (dispatcherLock) { synchronized (session.getMessageDeliveryLock()) { doWithAllLocks(r, sessions); } } } else { synchronized (session.getMessageDeliveryLock()) { doWithAllLocks(r, sessions); } } } else { synchronized (getFailoverMutex()) { r.run(); } } } }