diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 1485 |
1 files changed, 1485 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java new file mode 100644 index 0000000000..94a55ef52c --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -0,0 +1,1485 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQProtocolException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.TxSelectBody; +import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.Connection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable +{ + private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); + + + /** + * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be + * held by any child objects of this connection such as the session, producers and consumers. + */ + private final Object _failoverMutex = new Object(); + + private final Object _sessionCreationLock = new Object(); + + /** + * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session + * and we must prevent the client from opening too many. + */ + private long _maximumChannelCount; + + /** The maximum size of frame supported by the server */ + private long _maximumFrameSize; + + /** + * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped + * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate + * handler. + */ + protected AMQProtocolHandler _protocolHandler; + + /** Maps from session id (Integer) to AMQSession instance */ + private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); + + private final String _clientName; + + /** The user name to use for authentication */ + private String _username; + + /** The password to use for authentication */ + private String _password; + + /** The virtual path to connect to on the AMQ server */ + private String _virtualHost; + + protected ExceptionListener _exceptionListener; + + private ConnectionListener _connectionListener; + + private final ConnectionURL _connectionURL; + + /** + * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message + * publication. + */ + protected volatile boolean _started; + + /** Policy dictating how to failover */ + protected FailoverPolicy _failoverPolicy; + + /* + * _Connected should be refactored with a suitable wait object. + */ + protected boolean _connected; + + /* + * The connection meta data + */ + private QpidConnectionMetaData _connectionMetaData; + + /** Configuration info for SSL */ + private SSLConfiguration _sslConfiguration; + + private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + + /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ + private final ExecutorService _taskPool = Executors.newCachedThreadPool(); + private static final long DEFAULT_TIMEOUT = 1000 * 30; + + protected AMQConnectionDelegate _delegate; + + // this connection maximum number of prefetched messages + private int _maxPrefetch; + + //Indicates whether persistent messages are synchronized + private boolean _syncPersistence; + + //Indicates whether we need to sync on every message ack + private boolean _syncAck; + + //Indicates the sync publish options (persistent|all) + //By default it's async publish + private String _syncPublish = ""; + + // Indicates whether to use the old map message format or the + // new amqp-0-10 encoded format. + private boolean _useLegacyMapMessageFormat; + + /** + * @param broker brokerdetails + * @param username username + * @param password password + * @param clientName clientid + * @param virtualHost virtualhost + * + * @throws AMQException + * @throws URLSyntaxException + */ + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + + AMQBrokerDetails.checkTransport(broker) + "'"), null); + } + + /** + * @param broker brokerdetails + * @param username username + * @param password password + * @param clientName clientid + * @param virtualHost virtualhost + * + * @throws AMQException + * @throws URLSyntaxException + */ + public AMQConnection(String broker, String username, String password, String clientName, String virtualHost, + SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL( + ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig); + } + + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) + throws AMQException, URLSyntaxException + { + this(host, port, false, username, password, clientName, virtualHost, null); + } + + public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, + SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + { + this(host, port, false, username, password, clientName, virtualHost, sslConfig); + } + + public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, + String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL( + useSSL + ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'") + : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig); + } + + public AMQConnection(String connection) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL(connection), null); + } + + public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL(connection), sslConfig); + } + + /** + * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception + * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. + */ + public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException + { + if (connectionURL == null) + { + throw new IllegalArgumentException("Connection must be specified"); + } + + // set this connection maxPrefetch + if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) + { + _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); + } + else + { + // use the default value set for all connections + _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, + ClientProperties.MAX_PREFETCH_DEFAULT)); + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) + { + _syncPersistence = + Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); + } + else + { + // use the default value set for all connections + _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); + if (_syncPersistence) + { + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); + } + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) + { + _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); + } + else + { + // use the default value set for all connections + _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) + { + _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); + } + else + { + // use the default value set for all connections + _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish); + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null) + { + _useLegacyMapMessageFormat = Boolean.parseBoolean( + connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT)); + } + else + { + // use the default value set for all connections + _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); + } + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); + _logger.debug("AMQP version " + amqpVersion); + + _failoverPolicy = new FailoverPolicy(connectionURL, this); + BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); + if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_8_0(this); + } + else if ("0-9".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_0_9(this); + } + else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_9_1(this); + } + else + { + _delegate = new AMQConnectionDelegate_0_10(this); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Connection:" + connectionURL); + } + + _sslConfiguration = sslConfig; + _connectionURL = connectionURL; + + _clientName = connectionURL.getClientName(); + _username = connectionURL.getUsername(); + _password = connectionURL.getPassword(); + + setVirtualHost(connectionURL.getVirtualHost()); + + if (connectionURL.getDefaultQueueExchangeName() != null) + { + _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); + } + + if (connectionURL.getDefaultTopicExchangeName() != null) + { + _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName(); + } + + if (connectionURL.getTemporaryQueueExchangeName() != null) + { + _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName(); + } + + if (connectionURL.getTemporaryTopicExchangeName() != null) + { + _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); + } + + _protocolHandler = new AMQProtocolHandler(this); + + _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + + // We are not currently connected + _connected = false; + + boolean retryAllowed = true; + Exception connectionException = null; + while (!_connected && retryAllowed && brokerDetails != null) + { + ProtocolVersion pe = null; + try + { + pe = makeBrokerConnection(brokerDetails); + } + catch (Exception e) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails(), + e); + } + connectionException = e; + } + + if (pe != null) + { + // reset the delegate to the version returned by the + // broker + initDelegate(pe); + } + else if (!_connected) + { + retryAllowed = _failoverPolicy.failoverAllowed(); + brokerDetails = _failoverPolicy.getNextBrokerDetails(); + } + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Are we connected:" + _connected); + } + + if (!_connected) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } + + String message = null; + + if (connectionException != null) + { + if (connectionException.getCause() != null) + { + message = connectionException.getCause().getMessage(); + } + else + { + message = connectionException.getMessage(); + } + } + + if ((message == null) || message.equals("")) + { + if (message == null) + { + message = "Unable to Connect"; + } + else // can only be "" if getMessage() returned it therfore lastException != null + { + message = "Unable to Connect:" + connectionException.getClass(); + } + } + + for (Throwable th = connectionException; th != null; th = th.getCause()) + { + if (th instanceof UnresolvedAddressException || + th instanceof UnknownHostException) + { + throw new AMQUnresolvedAddressException + (message, + _failoverPolicy.getCurrentBrokerDetails().toString(), + connectionException); + } + } + + throw new AMQConnectionFailureException(message, connectionException); + } + + _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); + _sessions.setMinChannelID(_delegate.getMinChannelID()); + + _connectionMetaData = new QpidConnectionMetaData(this); + } + + protected boolean checkException(Throwable thrown) + { + Throwable cause = thrown.getCause(); + + if (cause == null) + { + cause = thrown; + } + + return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); + } + + private void initDelegate(ProtocolVersion pe) throws AMQProtocolException + { + try + { + String delegateClassName = String.format + ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", + pe.getMajorVersion(), pe.getMinorVersion()); + _logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); + Class c = Class.forName(delegateClassName); + Class partypes[] = new Class[1]; + partypes[0] = AMQConnection.class; + _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); + //Update our session to use this new protocol version + _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); + + } + catch (ClassNotFoundException e) + { + throw new AMQProtocolException + (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, + String.format("Protocol: %s.%s is rquired by the broker but is not " + + "currently supported by this client library implementation", + pe.getMajorVersion(), pe.getMinorVersion()), + e); + } + catch (NoSuchMethodException e) + { + throw new RuntimeException("unable to locate constructor for delegate", e); + } + catch (InstantiationException e) + { + throw new RuntimeException("error instantiating delegate", e); + } + catch (IllegalAccessException e) + { + throw new RuntimeException("error accessing delegate", e); + } + catch (InvocationTargetException e) + { + throw new RuntimeException("error invoking delegate", e); + } + } + + private void setVirtualHost(String virtualHost) + { + if (virtualHost != null && virtualHost.startsWith("/")) + { + virtualHost = virtualHost.substring(1); + } + + _virtualHost = virtualHost; + } + + public boolean attemptReconnection(String host, int port) + { + BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration); + + _failoverPolicy.setBroker(bd); + + try + { + makeBrokerConnection(bd); + + return true; + } + catch (Exception e) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + bd); + } + + attemptReconnection(); + } + + return false; + } + + public boolean attemptReconnection() + { + BrokerDetails broker = null; + while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) + { + try + { + makeBrokerConnection(broker); + return true; + } + catch (Exception e) + { + if (!(e instanceof AMQException)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info(e.getMessage() + ":Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails()); + } + } + } + } + + // connection unsuccessful + return false; + } + + public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + { + return _delegate.makeBrokerConnection(brokerDetail); + } + + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + return _delegate.executeRetrySupport(operation); + } + + /** + * Get the details of the currently active broker + * + * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance + * otherwise + */ + public BrokerDetails getActiveBrokerDetails() + { + return _failoverPolicy.getCurrentBrokerDetails(); + } + + public boolean failoverAllowed() + { + if (!_connected) + { + return false; + } + else + { + return _failoverPolicy.failoverAllowed(); + } + } + + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + return createSession(transacted, acknowledgeMode, _maxPrefetch); + } + + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) + throws JMSException + { + return createSession(transacted, acknowledgeMode, prefetch, prefetch); + } + + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, + final int prefetchHigh, final int prefetchLow) throws JMSException + { + synchronized (_sessionCreationLock) + { + checkNotClosed(); + return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow); + } + } + + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) + throws AMQException, FailoverException + { + + ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); + + // TODO: Be aware of possible changes to parameter order as versions change. + + _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); + + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class); + + if (transacted) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Issuing TxSelect for " + channelId); + } + + TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody(); + + // TODO: Be aware of possible changes to parameter order as versions change. + _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); + } + } + + public void setFailoverPolicy(FailoverPolicy policy) + { + _failoverPolicy = policy; + } + + public FailoverPolicy getFailoverPolicy() + { + return _failoverPolicy; + } + + /** + * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in + * the JMS spec + * + * @param transacted + * @param acknowledgeMode + * + * @return QueueSession + * + * @throws JMSException + */ + public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException + { + return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode)); + } + + /** + * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in + * the JMS spec + * + * @param transacted + * @param acknowledgeMode + * + * @return TopicSession + * + * @throws JMSException + */ + public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException + { + return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode)); + } + + public boolean channelLimitReached() + { + return _sessions.size() >= _maximumChannelCount; + } + + public String getClientID() throws JMSException + { + checkNotClosed(); + + return _clientName; + } + + public void setClientID(String clientID) throws JMSException + { + checkNotClosed(); + // in AMQP it is not possible to change the client ID. If one is not specified + // upon connection construction, an id is generated automatically. Therefore + // we can always throw an exception. + if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME)) + { + throw new IllegalStateException("Client name cannot be changed after being set"); + } + else + { + _logger.info("Operation setClientID is ignored using ID: " + getClientID()); + } + } + + public ConnectionMetaData getMetaData() throws JMSException + { + checkNotClosed(); + + return _connectionMetaData; + + } + + public ExceptionListener getExceptionListener() throws JMSException + { + checkNotClosed(); + + return _exceptionListener; + } + + public void setExceptionListener(ExceptionListener listener) throws JMSException + { + checkNotClosed(); + _exceptionListener = listener; + } + + /** + * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread + * and is not thread safe (which is legal according to the JMS specification). + * + * @throws JMSException + */ + public void start() throws JMSException + { + checkNotClosed(); + if (!_started) + { + _started = true; + final Iterator it = _sessions.values().iterator(); + while (it.hasNext()) + { + final AMQSession s = (AMQSession) (it.next()); + try + { + s.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + } + } + + public void stop() throws JMSException + { + checkNotClosed(); + if (_started) + { + for (Iterator i = _sessions.values().iterator(); i.hasNext();) + { + try + { + ((AMQSession) i.next()).stop(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + _started = false; + } + } + + public void close() throws JMSException + { + close(DEFAULT_TIMEOUT); + } + + public void close(long timeout) throws JMSException + { + close(new ArrayList<AMQSession>(_sessions.values()), timeout); + } + + public void close(List<AMQSession> sessions, long timeout) throws JMSException + { + if (!_closed.getAndSet(true)) + { + _closing.set(true); + try{ + doClose(sessions, timeout); + }finally{ + _closing.set(false); + } + } + } + + private void doClose(List<AMQSession> sessions, long timeout) throws JMSException + { + synchronized (_sessionCreationLock) + { + if (!sessions.isEmpty()) + { + AMQSession session = sessions.remove(0); + synchronized (session.getMessageDeliveryLock()) + { + doClose(sessions, timeout); + } + } + else + { + synchronized (getFailoverMutex()) + { + try + { + long startCloseTime = System.currentTimeMillis(); + + closeAllSessions(null, timeout, startCloseTime); + + //This MUST occur after we have successfully closed all Channels/Sessions + _taskPool.shutdown(); + + if (!_taskPool.isTerminated()) + { + try + { + // adjust timeout + long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); + + _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + _logger.info("Interrupted while shutting down connection thread pool."); + } + } + + // adjust timeout + timeout = adjustTimeout(timeout, startCloseTime); + _delegate.closeConnection(timeout); + + //If the taskpool hasn't shutdown by now then give it shutdownNow. + // This will interupt any running tasks. + if (!_taskPool.isTerminated()) + { + List<Runnable> tasks = _taskPool.shutdownNow(); + for (Runnable r : tasks) + { + _logger.warn("Connection close forced taskpool to prevent execution:" + r); + } + } + } + catch (AMQException e) + { + _logger.error("error:", e); + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + } + } + } + } + + private long adjustTimeout(long timeout, long startTime) + { + long now = System.currentTimeMillis(); + timeout -= now - startTime; + if (timeout < 0) + { + timeout = 0; + } + + return timeout; + } + + /** + * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to + * mark objects "visible" in userland as closed after failover or other significant event that impacts the + * connection. <p/> The caller must hold the failover mutex before calling this method. + */ + private void markAllSessionsClosed() + { + final LinkedList sessionCopy = new LinkedList(_sessions.values()); + final Iterator it = sessionCopy.iterator(); + while (it.hasNext()) + { + final AMQSession session = (AMQSession) it.next(); + + session.markClosed(); + } + + _sessions.clear(); + } + + /** + * Close all the sessions, either due to normal connection closure or due to an error occurring. + * + * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex + * before calling this method. + */ + private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException + { + final LinkedList sessionCopy = new LinkedList(_sessions.values()); + final Iterator it = sessionCopy.iterator(); + JMSException sessionException = null; + while (it.hasNext()) + { + final AMQSession session = (AMQSession) it.next(); + if (cause != null) + { + session.closed(cause); + } + else + { + try + { + if (starttime != -1) + { + timeout = adjustTimeout(timeout, starttime); + } + + session.close(timeout); + } + catch (JMSException e) + { + _logger.error("Error closing session: " + e); + sessionException = e; + } + } + } + + _sessions.clear(); + if (sessionException != null) + { + throw sessionException; + } + } + + public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException + { + checkNotClosed(); + + return null; + } + + public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException + { + checkNotClosed(); + + return null; + } + + public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, + int maxMessages) throws JMSException + { + checkNotClosed(); + + return null; + } + + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException + { + // TODO Auto-generated method stub + checkNotClosed(); + + return null; + } + + public long getMaximumChannelCount() throws JMSException + { + checkNotClosed(); + + return _maximumChannelCount; + } + + public void setConnectionListener(ConnectionListener listener) + { + _connectionListener = listener; + } + + public ConnectionListener getConnectionListener() + { + return _connectionListener; + } + + public void setMaximumChannelCount(long maximumChannelCount) + { + _maximumChannelCount = maximumChannelCount; + } + + public void setMaximumFrameSize(long frameMax) + { + _maximumFrameSize = frameMax; + } + + public long getMaximumFrameSize() + { + return _maximumFrameSize; + } + + public ChannelToSessionMap getSessions() + { + return _sessions; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String id) + { + _username = id; + } + + public String getPassword() + { + return _password; + } + + public String getVirtualHost() + { + return _virtualHost; + } + + public AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + public boolean started() + { + return _started; + } + + public void bytesSent(long writtenBytes) + { + if (_connectionListener != null) + { + _connectionListener.bytesSent(writtenBytes); + } + } + + public void bytesReceived(long receivedBytes) + { + if (_connectionListener != null) + { + _connectionListener.bytesReceived(receivedBytes); + } + } + + /** + * Fire the preFailover event to the registered connection listener (if any) + * + * @param redirect true if this is the result of a redirect request rather than a connection error + * + * @return true if no listener or listener does not veto change + */ + public boolean firePreFailover(boolean redirect) + { + boolean proceed = true; + if (_connectionListener != null) + { + proceed = _connectionListener.preFailover(redirect); + } + + return proceed; + } + + /** + * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes + * resubscription then all the sessions are closed. + * + * @return true if no listener or listener does not veto resubscription. + * + * @throws JMSException + */ + public boolean firePreResubscribe() throws JMSException + { + if (_connectionListener != null) + { + boolean resubscribe = _connectionListener.preResubscribe(); + if (!resubscribe) + { + markAllSessionsClosed(); + } + + return resubscribe; + } + else + { + return true; + } + } + + /** Fires a failover complete event to the registered connection listener (if any). */ + public void fireFailoverComplete() + { + if (_connectionListener != null) + { + _connectionListener.failoverComplete(); + } + } + + /** + * In order to protect the consistency of the connection and its child sessions, consumers and producers, the + * "failover mutex" must be held when doing any operations that could be corrupted during failover. + * + * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. + */ + public final Object getFailoverMutex() + { + return _failoverMutex; + } + + public void failoverPrep() + { + _delegate.failoverPrep(); + } + + public void resubscribeSessions() throws JMSException, AMQException, FailoverException + { + _delegate.resubscribeSessions(); + } + + /** + * If failover is taking place this will block until it has completed. If failover is not taking place it will + * return immediately. + * + * @throws InterruptedException + */ + public void blockUntilNotFailingOver() throws InterruptedException + { + _protocolHandler.blockUntilNotFailingOver(); + } + + /** + * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception + * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will + * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them. + * + * @param cause the exception + */ + public void exceptionReceived(Throwable cause) + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause); + } + + final JMSException je; + if (cause instanceof JMSException) + { + je = (JMSException) cause; + } + else + { + AMQConstant code = null; + + if (cause instanceof AMQException) + { + code = ((AMQException) cause).getErrorCode(); + } + + if (code != null) + { + je = new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + cause); + } + else + { + //Should never get here as all AMQEs are required to have an ErrorCode! + // Other than AMQDisconnectedEx! + + if (cause instanceof AMQDisconnectedException) + { + Exception last = _protocolHandler.getStateManager().getLastException(); + if (last != null) + { + _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception"); + cause = last; + } + } + je = new JMSException("Exception thrown against " + toString() + ": " + cause); + } + + if (cause instanceof Exception) + { + je.setLinkedException((Exception) cause); + } + + je.initCause(cause); + } + + boolean closer = false; + + // in the case of an IOException, MINA has closed the protocol session so we set _closed to true + // so that any generic client code that tries to close the connection will not mess up this error + // handling sequence + if (cause instanceof IOException || cause instanceof AMQDisconnectedException) + { + // If we have an IOE/AMQDisconnect there is no connection to close on. + _closing.set(false); + closer = !_closed.getAndSet(true); + + _protocolHandler.getProtocolSession().notifyError(je); + } + + // get the failover mutex before trying to close + synchronized (getFailoverMutex()) + { + // decide if we are going to close the session + if (hardError(cause)) + { + closer = (!_closed.getAndSet(true)) || closer; + { + _logger.info("Closing AMQConnection due to :" + cause); + } + } + else + { + _logger.info("Not a hard-error connection not closing: " + cause); + } + + // deliver the exception if there is a listener + if (_exceptionListener != null) + { + _exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } + + // if we are closing the connection, close sessions first + if (closer) + { + try + { + closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + } + catch (JMSException e) + { + _logger.error("Error closing all sessions: " + e, e); + } + } + } + } + + private boolean hardError(Throwable cause) + { + if (cause instanceof AMQException) + { + return ((AMQException) cause).isHardError(); + } + + return true; + } + + void registerSession(int channelId, AMQSession session) + { + _sessions.put(channelId, session); + } + + public void deregisterSession(int channelId) + { + _sessions.remove(channelId); + } + + public String toString() + { + StringBuffer buf = new StringBuffer("AMQConnection:\n"); + if (_failoverPolicy.getCurrentBrokerDetails() == null) + { + buf.append("No active broker connection"); + } + else + { + BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails(); + buf.append("Host: ").append(String.valueOf(bd.getHost())); + buf.append("\nPort: ").append(String.valueOf(bd.getPort())); + } + + buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); + buf.append("\nClient ID: ").append(String.valueOf(_clientName)); + buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size()); + + return buf.toString(); + } + + /** + * Returns connection url. + * @return connection url + */ + public ConnectionURL getConnectionURL() + { + return _connectionURL; + } + + /** + * Returns stringified connection url. This url is suitable only for display + * as {@link AMQConnectionURL#toString()} converts any password to asterisks. + * @return connection url + */ + public String toURL() + { + return _connectionURL.toString(); + } + + public Reference getReference() throws NamingException + { + return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()), + AMQConnectionFactory.class.getName(), null); // factory location + } + + public SSLConfiguration getSSLConfiguration() + { + return _sslConfiguration; + } + + public AMQShortString getDefaultTopicExchangeName() + { + return _defaultTopicExchangeName; + } + + public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName) + { + _defaultTopicExchangeName = defaultTopicExchangeName; + } + + public AMQShortString getDefaultQueueExchangeName() + { + return _defaultQueueExchangeName; + } + + public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName) + { + _defaultQueueExchangeName = defaultQueueExchangeName; + } + + public AMQShortString getTemporaryTopicExchangeName() + { + return _temporaryTopicExchangeName; + } + + public AMQShortString getTemporaryQueueExchangeName() + { + return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates. + } + + public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) + { + _temporaryTopicExchangeName = temporaryTopicExchangeName; + } + + public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName) + { + _temporaryQueueExchangeName = temporaryQueueExchangeName; + } + + public void performConnectionTask(Runnable task) + { + _taskPool.execute(task); + } + + public AMQSession getSession(int channelId) + { + return _sessions.get(channelId); + } + + public ProtocolVersion getProtocolVersion() + { + return _delegate.getProtocolVersion(); + } + + public boolean isFailingOver() + { + return (_protocolHandler.getFailoverLatch() != null); + } + + /** + * Get the maximum number of messages that this connection can pre-fetch. + * + * @return The maximum number of messages that this connection can pre-fetch. + */ + public long getMaxPrefetch() + { + return _maxPrefetch; + } + + /** + * Indicates whether persistent messages are synchronized + * + * @return true if persistent messages are synchronized false otherwise + */ + public boolean getSyncPersistence() + { + return _syncPersistence; + } + + /** + * Indicates whether we need to sync on every message ack + */ + public boolean getSyncAck() + { + return _syncAck; + } + + public String getSyncPublish() + { + return _syncPublish; + } + + public int getNextChannelID() + { + return _sessions.getNextChannelId(); + } + + public boolean isUseLegacyMapMessageFormat() + { + return _useLegacyMapMessageFormat; + } +} |