/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.client; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.ServerSessionPool; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be * held by any child objects of this connection such as the session, producers and consumers. */ private final Object _failoverMutex = new Object(); private final Object _sessionCreationLock = new Object(); /** * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session * and we must prevent the client from opening too many. */ private long _maximumChannelCount; /** The maximum size of frame supported by the server */ private long _maximumFrameSize; /** * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate * handler. */ protected AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); private String _clientName; /** The user name to use for authentication */ private String _username; /** The password to use for authentication */ private String _password; /** The virtual path to connect to on the AMQ server */ private String _virtualHost; protected ExceptionListener _exceptionListener; private ConnectionListener _connectionListener; private ConnectionURL _connectionURL; /** * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message * publication. */ protected volatile boolean _started; /** Policy dictating how to failover */ protected FailoverPolicy _failoverPolicy; /* * _Connected should be refactored with a suitable wait object. */ protected boolean _connected; /* * The connection meta data */ private QpidConnectionMetaData _connectionMetaData; /** Configuration info for SSL */ private SSLConfiguration _sslConfiguration; private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; protected AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages private int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; //Indicates whether we need to sync on every message ack private boolean _syncAck; //Indicates the sync publish options (persistent|all) //By default it's async publish private String _syncPublish = ""; // Indicates whether to use the old map message format or the // new amqp-0-10 encoded format. private boolean _useLegacyMapMessageFormat; /** * @param broker brokerdetails * @param username username * @param password password * @param clientName clientid * @param virtualHost virtualhost * * @throws AMQException * @throws URLSyntaxException */ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), null); } /** * @param broker brokerdetails * @param username username * @param password password * @param clientName clientid * @param virtualHost virtualhost * * @throws AMQException * @throws URLSyntaxException */ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(new AMQConnectionURL( ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); } public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, null); } public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(host, port, false, username, password, clientName, virtualHost, sslConfig); } public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(new AMQConnectionURL( useSSL ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'") : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(connection), null); } public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(connection), sslConfig); } /** * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { // set this connection maxPrefetch if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); } else { // use the defaul value set for all connections _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, ClientProperties.MAX_PREFETCH_DEFAULT)); } if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); _logger.warn("sync_persistence is a deprecated property, " + "please use sync_publish={persistent|all} instead"); } else { // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); if (_syncPersistence) { _logger.warn("sync_persistence is a deprecated property, " + "please use sync_publish={persistent|all} instead"); } } if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) { _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); } else { // use the defaul value set for all connections _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); } if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) { _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); } else { // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish); } if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null) { _useLegacyMapMessageFormat = Boolean.parseBoolean( connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT)); } else { // use the default value set for all connections _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); } String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); _logger.debug("AMQP version " + amqpVersion); _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); } else if ("0-9".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_0_9(this); } else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_9_1(this); } else { _delegate = new AMQConnectionDelegate_0_10(this); } if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); } _sslConfiguration = sslConfig; if (connectionURL == null) { throw new IllegalArgumentException("Connection must be specified"); } _connectionURL = connectionURL; _clientName = connectionURL.getClientName(); _username = connectionURL.getUsername(); _password = connectionURL.getPassword(); setVirtualHost(connectionURL.getVirtualHost()); if (connectionURL.getDefaultQueueExchangeName() != null) { _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); } if (connectionURL.getDefaultTopicExchangeName() != null) { _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName(); } if (connectionURL.getTemporaryQueueExchangeName() != null) { _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName(); } if (connectionURL.getTemporaryTopicExchangeName() != null) { _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); } _protocolHandler = new AMQProtocolHandler(this); _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); // We are not currently connected _connected = false; boolean retryAllowed = true; Exception connectionException = null; while (!_connected && retryAllowed && brokerDetails != null) { ProtocolVersion pe = null; try { pe = makeBrokerConnection(brokerDetails); } catch (Exception e) { if (_logger.isInfoEnabled()) { _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); } connectionException = e; } if (pe != null) { // reset the delegate to the version returned by the // broker initDelegate(pe); } else if (!_connected) { retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } if (_logger.isDebugEnabled()) { _logger.debug("Are we connected:" + _connected); } if (!_connected) { if (_logger.isDebugEnabled()) { _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); } String message = null; if (connectionException != null) { if (connectionException.getCause() != null) { message = connectionException.getCause().getMessage(); } else { message = connectionException.getMessage(); } } if ((message == null) || message.equals("")) { if (message == null) { message = "Unable to Connect"; } else // can only be "" if getMessage() returned it therfore lastException != null { message = "Unable to Connect:" + connectionException.getClass(); } } for (Throwable th = connectionException; th != null; th = th.getCause()) { if (th instanceof UnresolvedAddressException || th instanceof UnknownHostException) { throw new AMQUnresolvedAddressException (message, _failoverPolicy.getCurrentBrokerDetails().toString(), connectionException); } } throw new AMQConnectionFailureException(message, connectionException); } _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); _sessions.setMaxChannelID(_delegate.getMaxChannelID()); _sessions.setMinChannelID(_delegate.getMinChannelID()); _connectionMetaData = new QpidConnectionMetaData(this); } protected boolean checkException(Throwable thrown) { Throwable cause = thrown.getCause(); if (cause == null) { cause = thrown; } return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } private void initDelegate(ProtocolVersion pe) throws AMQProtocolException { try { String delegateClassName = String.format ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", pe.getMajorVersion(), pe.getMinorVersion()); _logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); Class c = Class.forName(delegateClassName); Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); //Update our session to use this new protocol version _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); } catch (ClassNotFoundException e) { throw new AMQProtocolException (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, String.format("Protocol: %s.%s is rquired by the broker but is not " + "currently supported by this client library implementation", pe.getMajorVersion(), pe.getMinorVersion()), e); } catch (NoSuchMethodException e) { throw new RuntimeException("unable to locate constructor for delegate", e); } catch (InstantiationException e) { throw new RuntimeException("error instantiating delegate", e); } catch (IllegalAccessException e) { throw new RuntimeException("error accessing delegate", e); } catch (InvocationTargetException e) { throw new RuntimeException("error invoking delegate", e); } } protected AMQConnection(String username, String password, String clientName, String virtualHost) { _clientName = clientName; _username = username; _password = password; setVirtualHost(virtualHost); } private void setVirtualHost(String virtualHost) { if (virtualHost != null && virtualHost.startsWith("/")) { virtualHost = virtualHost.substring(1); } _virtualHost = virtualHost; } public boolean attemptReconnection(String host, int port) { BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration); _failoverPolicy.setBroker(bd); try { makeBrokerConnection(bd); return true; } catch (Exception e) { if (_logger.isInfoEnabled()) { _logger.info("Unable to connect to broker at " + bd); } attemptReconnection(); } return false; } public boolean attemptReconnection() { BrokerDetails broker = null; while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { try { makeBrokerConnection(broker); return true; } catch (Exception e) { if (!(e instanceof AMQException)) { if (_logger.isInfoEnabled()) { _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); } } else { if (_logger.isInfoEnabled()) { _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); } } } } // connection unsuccessful return false; } public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { return _delegate.makeBrokerConnection(brokerDetail); } public T executeRetrySupport(FailoverProtectedOperation operation) throws E { return _delegate.executeRetrySupport(operation); } /** * Get the details of the currently active broker * * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance * otherwise */ public BrokerDetails getActiveBrokerDetails() { return _failoverPolicy.getCurrentBrokerDetails(); } public boolean failoverAllowed() { if (!_connected) { return false; } else { return _failoverPolicy.failoverAllowed(); } } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { return createSession(transacted, acknowledgeMode, _maxPrefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) throws JMSException { return createSession(transacted, acknowledgeMode, prefetch, prefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { synchronized (_sessionCreationLock) { checkNotClosed(); return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow); } } private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException, FailoverException { ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); // TODO: Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false); // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class); if (transacted) { if (_logger.isDebugEnabled()) { _logger.debug("Issuing TxSelect for " + channelId); } TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody(); // TODO: Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); } } private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException, FailoverException { try { createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); } catch (AMQException e) { deregisterSession(channelId); throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e); } } public void setFailoverPolicy(FailoverPolicy policy) { _failoverPolicy = policy; } public FailoverPolicy getFailoverPolicy() { return _failoverPolicy; } /** * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in * the JMS spec * * @param transacted * @param acknowledgeMode * * @return QueueSession * * @throws JMSException */ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode)); } /** * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in * the JMS spec * * @param transacted * @param acknowledgeMode * * @return TopicSession * * @throws JMSException */ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode)); } public boolean channelLimitReached() { return _sessions.size() >= _maximumChannelCount; } public String getClientID() throws JMSException { checkNotClosed(); return _clientName; } public void setClientID(String clientID) throws JMSException { checkNotClosed(); // in AMQP it is not possible to change the client ID. If one is not specified // upon connection construction, an id is generated automatically. Therefore // we can always throw an exception. if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME)) { throw new IllegalStateException("Client name cannot be changed after being set"); } else { _logger.info("Operation setClientID is ignored using ID: " + getClientID()); } } public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); return _connectionMetaData; } public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); return _exceptionListener; } public void setExceptionListener(ExceptionListener listener) throws JMSException { checkNotClosed(); _exceptionListener = listener; } /** * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread * and is not thread safe (which is legal according to the JMS specification). * * @throws JMSException */ public void start() throws JMSException { checkNotClosed(); if (!_started) { _started = true; final Iterator it = _sessions.values().iterator(); while (it.hasNext()) { final AMQSession s = (AMQSession) (it.next()); try { s.start(); } catch (AMQException e) { throw new JMSAMQException(e); } } } } public void stop() throws JMSException { checkNotClosed(); if (_started) { for (Iterator i = _sessions.values().iterator(); i.hasNext();) { try { ((AMQSession) i.next()).stop(); } catch (AMQException e) { throw new JMSAMQException(e); } } _started = false; } } public void close() throws JMSException { close(DEFAULT_TIMEOUT); } public void close(long timeout) throws JMSException { close(new ArrayList(_sessions.values()), timeout); } public void close(List sessions, long timeout) throws JMSException { if (!_closed.getAndSet(true)) { _closing.set(true); try{ doClose(sessions, timeout); }finally{ _closing.set(false); } } } private void doClose(List sessions, long timeout) throws JMSException { synchronized (_sessionCreationLock) { if (!sessions.isEmpty()) { AMQSession session = sessions.remove(0); synchronized (session.getMessageDeliveryLock()) { doClose(sessions, timeout); } } else { synchronized (getFailoverMutex()) { try { long startCloseTime = System.currentTimeMillis(); closeAllSessions(null, timeout, startCloseTime); //This MUST occur after we have successfully closed all Channels/Sessions _taskPool.shutdown(); if (!_taskPool.isTerminated()) { try { // adjust timeout long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { _logger.info("Interrupted while shutting down connection thread pool."); } } // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); _delegate.closeConnection(timeout); //If the taskpool hasn't shutdown by now then give it shutdownNow. // This will interupt any running tasks. if (!_taskPool.isTerminated()) { List tasks = _taskPool.shutdownNow(); for (Runnable r : tasks) { _logger.warn("Connection close forced taskpool to prevent execution:" + r); } } } catch (AMQException e) { _logger.error("error:", e); JMSException jmse = new JMSException("Error closing connection: " + e); jmse.setLinkedException(e); jmse.initCause(e); throw jmse; } } } } } private long adjustTimeout(long timeout, long startTime) { long now = System.currentTimeMillis(); timeout -= now - startTime; if (timeout < 0) { timeout = 0; } return timeout; } /** * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to * mark objects "visible" in userland as closed after failover or other significant event that impacts the * connection.

The caller must hold the failover mutex before calling this method. */ private void markAllSessionsClosed() { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); while (it.hasNext()) { final AMQSession session = (AMQSession) it.next(); session.markClosed(); } _sessions.clear(); } /** * Close all the sessions, either due to normal connection closure or due to an error occurring. * * @param cause if not null, the error that is causing this shutdown

The caller must hold the failover mutex * before calling this method. */ private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); JMSException sessionException = null; while (it.hasNext()) { final AMQSession session = (AMQSession) it.next(); if (cause != null) { session.closed(cause); } else { try { if (starttime != -1) { timeout = adjustTimeout(timeout, starttime); } session.close(timeout); } catch (JMSException e) { _logger.error("Error closing session: " + e); sessionException = e; } } } _sessions.clear(); if (sessionException != null) { throw sessionException; } } public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); return null; } public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); return null; } public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { checkNotClosed(); return null; } public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { // TODO Auto-generated method stub checkNotClosed(); return null; } public long getMaximumChannelCount() throws JMSException { checkNotClosed(); return _maximumChannelCount; } public void setConnectionListener(ConnectionListener listener) { _connectionListener = listener; } public ConnectionListener getConnectionListener() { return _connectionListener; } public void setMaximumChannelCount(long maximumChannelCount) { _maximumChannelCount = maximumChannelCount; } public void setMaximumFrameSize(long frameMax) { _maximumFrameSize = frameMax; } public long getMaximumFrameSize() { return _maximumFrameSize; } public ChannelToSessionMap getSessions() { return _sessions; } public String getUsername() { return _username; } public void setUsername(String id) { _username = id; } public String getPassword() { return _password; } public String getVirtualHost() { return _virtualHost; } public AMQProtocolHandler getProtocolHandler() { return _protocolHandler; } public boolean started() { return _started; } public void bytesSent(long writtenBytes) { if (_connectionListener != null) { _connectionListener.bytesSent(writtenBytes); } } public void bytesReceived(long receivedBytes) { if (_connectionListener != null) { _connectionListener.bytesReceived(receivedBytes); } } /** * Fire the preFailover event to the registered connection listener (if any) * * @param redirect true if this is the result of a redirect request rather than a connection error * * @return true if no listener or listener does not veto change */ public boolean firePreFailover(boolean redirect) { boolean proceed = true; if (_connectionListener != null) { proceed = _connectionListener.preFailover(redirect); } return proceed; } /** * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes * resubscription then all the sessions are closed. * * @return true if no listener or listener does not veto resubscription. * * @throws JMSException */ public boolean firePreResubscribe() throws JMSException { if (_connectionListener != null) { boolean resubscribe = _connectionListener.preResubscribe(); if (!resubscribe) { markAllSessionsClosed(); } return resubscribe; } else { return true; } } /** Fires a failover complete event to the registered connection listener (if any). */ public void fireFailoverComplete() { if (_connectionListener != null) { _connectionListener.failoverComplete(); } } /** * In order to protect the consistency of the connection and its child sessions, consumers and producers, the * "failover mutex" must be held when doing any operations that could be corrupted during failover. * * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. */ public final Object getFailoverMutex() { return _failoverMutex; } public void failoverPrep() { _delegate.failoverPrep(); } public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); } /** * If failover is taking place this will block until it has completed. If failover is not taking place it will * return immediately. * * @throws InterruptedException */ public void blockUntilNotFailingOver() throws InterruptedException { _protocolHandler.blockUntilNotFailingOver(); } /** * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them. * * @param cause the exception */ public void exceptionReceived(Throwable cause) { if (_logger.isDebugEnabled()) { _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause); } final JMSException je; if (cause instanceof JMSException) { je = (JMSException) cause; } else { AMQConstant code = null; if (cause instanceof AMQException) { code = ((AMQException) cause).getErrorCode(); } if (code != null) { je = new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + cause); } else { //Should never get here as all AMQEs are required to have an ErrorCode! // Other than AMQDisconnectedEx! if (cause instanceof AMQDisconnectedException) { Exception last = _protocolHandler.getStateManager().getLastException(); if (last != null) { _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception"); cause = last; } } je = new JMSException("Exception thrown against " + toString() + ": " + cause); } if (cause instanceof Exception) { je.setLinkedException((Exception) cause); } je.initCause(cause); } boolean closer = false; // in the case of an IOException, MINA has closed the protocol session so we set _closed to true // so that any generic client code that tries to close the connection will not mess up this error // handling sequence if (cause instanceof IOException || cause instanceof AMQDisconnectedException) { // If we have an IOE/AMQDisconnect there is no connection to close on. _closing.set(false); closer = !_closed.getAndSet(true); _protocolHandler.getProtocolSession().notifyError(je); } // get the failover mutex before trying to close synchronized (getFailoverMutex()) { // decide if we are going to close the session if (hardError(cause)) { closer = (!_closed.getAndSet(true)) || closer; { _logger.info("Closing AMQConnection due to :" + cause); } } else { _logger.info("Not a hard-error connection not closing: " + cause); } // deliver the exception if there is a listener if (_exceptionListener != null) { _exceptionListener.onException(je); } else { _logger.error("Throwable Received but no listener set: " + cause); } // if we are closing the connection, close sessions first if (closer) { try { closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } catch (JMSException e) { _logger.error("Error closing all sessions: " + e, e); } } } } private boolean hardError(Throwable cause) { if (cause instanceof AMQException) { return ((AMQException) cause).isHardError(); } return true; } void registerSession(int channelId, AMQSession session) { _sessions.put(channelId, session); } public void deregisterSession(int channelId) { _sessions.remove(channelId); } public String toString() { StringBuffer buf = new StringBuffer("AMQConnection:\n"); if (_failoverPolicy.getCurrentBrokerDetails() == null) { buf.append("No active broker connection"); } else { BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails(); buf.append("Host: ").append(String.valueOf(bd.getHost())); buf.append("\nPort: ").append(String.valueOf(bd.getPort())); } buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); buf.append("\nClient ID: ").append(String.valueOf(_clientName)); buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size()); return buf.toString(); } public String toURL() { return _connectionURL.toString(); } public Reference getReference() throws NamingException { return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()), AMQConnectionFactory.class.getName(), null); // factory location } public SSLConfiguration getSSLConfiguration() { return _sslConfiguration; } public AMQShortString getDefaultTopicExchangeName() { return _defaultTopicExchangeName; } public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) { _defaultTopicExchangeName = defaultTopicExchangeName; } public AMQShortString getDefaultQueueExchangeName() { return _defaultQueueExchangeName; } public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) { _defaultQueueExchangeName = defaultQueueExchangeName; } public AMQShortString getTemporaryTopicExchangeName() { return _temporaryTopicExchangeName; } public AMQShortString getTemporaryQueueExchangeName() { return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates. } public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) { _temporaryTopicExchangeName = temporaryTopicExchangeName; } public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName) { _temporaryQueueExchangeName = temporaryQueueExchangeName; } public void performConnectionTask(Runnable task) { _taskPool.execute(task); } public AMQSession getSession(int channelId) { return _sessions.get(channelId); } public ProtocolVersion getProtocolVersion() { return _delegate.getProtocolVersion(); } public boolean isFailingOver() { return (_protocolHandler.getFailoverLatch() != null); } /** * Get the maximum number of messages that this connection can pre-fetch. * * @return The maximum number of messages that this connection can pre-fetch. */ public long getMaxPrefetch() { return _maxPrefetch; } /** * Indicates whether persistent messages are synchronized * * @return true if persistent messages are synchronized false otherwise */ public boolean getSyncPersistence() { return _syncPersistence; } /** * Indicates whether we need to sync on every message ack */ public boolean getSyncAck() { return _syncAck; } public String getSyncPublish() { return _syncPublish; } public int getNextChannelID() { return _sessions.getNextChannelId(); } public boolean isUseLegacyMapMessageFormat() { return _useLegacyMapMessageFormat; } }