summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java1485
1 files changed, 1485 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
new file mode 100644
index 0000000000..94a55ef52c
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -0,0 +1,1485 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
+import java.net.UnknownHostException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQProtocolException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
+
+
+ /**
+ * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
+ * held by any child objects of this connection such as the session, producers and consumers.
+ */
+ private final Object _failoverMutex = new Object();
+
+ private final Object _sessionCreationLock = new Object();
+
+ /**
+ * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
+ * and we must prevent the client from opening too many.
+ */
+ private long _maximumChannelCount;
+
+ /** The maximum size of frame supported by the server */
+ private long _maximumFrameSize;
+
+ /**
+ * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
+ * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
+ * handler.
+ */
+ protected AMQProtocolHandler _protocolHandler;
+
+ /** Maps from session id (Integer) to AMQSession instance */
+ private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
+
+ private final String _clientName;
+
+ /** The user name to use for authentication */
+ private String _username;
+
+ /** The password to use for authentication */
+ private String _password;
+
+ /** The virtual path to connect to on the AMQ server */
+ private String _virtualHost;
+
+ protected ExceptionListener _exceptionListener;
+
+ private ConnectionListener _connectionListener;
+
+ private final ConnectionURL _connectionURL;
+
+ /**
+ * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
+ * publication.
+ */
+ protected volatile boolean _started;
+
+ /** Policy dictating how to failover */
+ protected FailoverPolicy _failoverPolicy;
+
+ /*
+ * _Connected should be refactored with a suitable wait object.
+ */
+ protected boolean _connected;
+
+ /*
+ * The connection meta data
+ */
+ private QpidConnectionMetaData _connectionMetaData;
+
+ /** Configuration info for SSL */
+ private SSLConfiguration _sslConfiguration;
+
+ private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+
+ /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
+ private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+ private static final long DEFAULT_TIMEOUT = 1000 * 30;
+
+ protected AMQConnectionDelegate _delegate;
+
+ // this connection maximum number of prefetched messages
+ private int _maxPrefetch;
+
+ //Indicates whether persistent messages are synchronized
+ private boolean _syncPersistence;
+
+ //Indicates whether we need to sync on every message ack
+ private boolean _syncAck;
+
+ //Indicates the sync publish options (persistent|all)
+ //By default it's async publish
+ private String _syncPublish = "";
+
+ // Indicates whether to use the old map message format or the
+ // new amqp-0-10 encoded format.
+ private boolean _useLegacyMapMessageFormat;
+
+ /**
+ * @param broker brokerdetails
+ * @param username username
+ * @param password password
+ * @param clientName clientid
+ * @param virtualHost virtualhost
+ *
+ * @throws AMQException
+ * @throws URLSyntaxException
+ */
+ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
+ throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ + AMQBrokerDetails.checkTransport(broker) + "'"), null);
+ }
+
+ /**
+ * @param broker brokerdetails
+ * @param username username
+ * @param password password
+ * @param clientName clientid
+ * @param virtualHost virtualhost
+ *
+ * @throws AMQException
+ * @throws URLSyntaxException
+ */
+ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
+ SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
+ }
+
+ public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
+ throws AMQException, URLSyntaxException
+ {
+ this(host, port, false, username, password, clientName, virtualHost, null);
+ }
+
+ public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
+ SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ {
+ this(host, port, false, username, password, clientName, virtualHost, sslConfig);
+ }
+
+ public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
+ String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(
+ useSSL
+ ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'")
+ : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig);
+ }
+
+ public AMQConnection(String connection) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(connection), null);
+ }
+
+ public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(connection), sslConfig);
+ }
+
+ /**
+ * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
+ * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
+ */
+ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
+ {
+ if (connectionURL == null)
+ {
+ throw new IllegalArgumentException("Connection must be specified");
+ }
+
+ // set this connection maxPrefetch
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
+ {
+ _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
+ }
+ else
+ {
+ // use the default value set for all connections
+ _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
+ ClientProperties.MAX_PREFETCH_DEFAULT));
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
+ {
+ _syncPersistence =
+ Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
+ }
+ else
+ {
+ // use the default value set for all connections
+ _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
+ if (_syncPersistence)
+ {
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
+ }
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
+ {
+ _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
+ }
+ else
+ {
+ // use the default value set for all connections
+ _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
+ {
+ _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+ }
+ else
+ {
+ // use the default value set for all connections
+ _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
+ {
+ _useLegacyMapMessageFormat = Boolean.parseBoolean(
+ connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT));
+ }
+ else
+ {
+ // use the default value set for all connections
+ _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
+ }
+
+ String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
+ _logger.debug("AMQP version " + amqpVersion);
+
+ _failoverPolicy = new FailoverPolicy(connectionURL, this);
+ BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
+ if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_8_0(this);
+ }
+ else if ("0-9".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_0_9(this);
+ }
+ else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_9_1(this);
+ }
+ else
+ {
+ _delegate = new AMQConnectionDelegate_0_10(this);
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connection:" + connectionURL);
+ }
+
+ _sslConfiguration = sslConfig;
+ _connectionURL = connectionURL;
+
+ _clientName = connectionURL.getClientName();
+ _username = connectionURL.getUsername();
+ _password = connectionURL.getPassword();
+
+ setVirtualHost(connectionURL.getVirtualHost());
+
+ if (connectionURL.getDefaultQueueExchangeName() != null)
+ {
+ _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
+ }
+
+ if (connectionURL.getDefaultTopicExchangeName() != null)
+ {
+ _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
+ }
+
+ if (connectionURL.getTemporaryQueueExchangeName() != null)
+ {
+ _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
+ }
+
+ if (connectionURL.getTemporaryTopicExchangeName() != null)
+ {
+ _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
+ }
+
+ _protocolHandler = new AMQProtocolHandler(this);
+
+ _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+
+ // We are not currently connected
+ _connected = false;
+
+ boolean retryAllowed = true;
+ Exception connectionException = null;
+ while (!_connected && retryAllowed && brokerDetails != null)
+ {
+ ProtocolVersion pe = null;
+ try
+ {
+ pe = makeBrokerConnection(brokerDetails);
+ }
+ catch (Exception e)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " +
+ _failoverPolicy.getCurrentBrokerDetails(),
+ e);
+ }
+ connectionException = e;
+ }
+
+ if (pe != null)
+ {
+ // reset the delegate to the version returned by the
+ // broker
+ initDelegate(pe);
+ }
+ else if (!_connected)
+ {
+ retryAllowed = _failoverPolicy.failoverAllowed();
+ brokerDetails = _failoverPolicy.getNextBrokerDetails();
+ }
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Are we connected:" + _connected);
+ }
+
+ if (!_connected)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+ }
+
+ String message = null;
+
+ if (connectionException != null)
+ {
+ if (connectionException.getCause() != null)
+ {
+ message = connectionException.getCause().getMessage();
+ }
+ else
+ {
+ message = connectionException.getMessage();
+ }
+ }
+
+ if ((message == null) || message.equals(""))
+ {
+ if (message == null)
+ {
+ message = "Unable to Connect";
+ }
+ else // can only be "" if getMessage() returned it therfore lastException != null
+ {
+ message = "Unable to Connect:" + connectionException.getClass();
+ }
+ }
+
+ for (Throwable th = connectionException; th != null; th = th.getCause())
+ {
+ if (th instanceof UnresolvedAddressException ||
+ th instanceof UnknownHostException)
+ {
+ throw new AMQUnresolvedAddressException
+ (message,
+ _failoverPolicy.getCurrentBrokerDetails().toString(),
+ connectionException);
+ }
+ }
+
+ throw new AMQConnectionFailureException(message, connectionException);
+ }
+
+ _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+
+ _sessions.setMaxChannelID(_delegate.getMaxChannelID());
+ _sessions.setMinChannelID(_delegate.getMinChannelID());
+
+ _connectionMetaData = new QpidConnectionMetaData(this);
+ }
+
+ protected boolean checkException(Throwable thrown)
+ {
+ Throwable cause = thrown.getCause();
+
+ if (cause == null)
+ {
+ cause = thrown;
+ }
+
+ return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
+ }
+
+ private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
+ {
+ try
+ {
+ String delegateClassName = String.format
+ ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
+ pe.getMajorVersion(), pe.getMinorVersion());
+ _logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe);
+ Class c = Class.forName(delegateClassName);
+ Class partypes[] = new Class[1];
+ partypes[0] = AMQConnection.class;
+ _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
+ //Update our session to use this new protocol version
+ _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
+
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new AMQProtocolException
+ (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
+ String.format("Protocol: %s.%s is rquired by the broker but is not " +
+ "currently supported by this client library implementation",
+ pe.getMajorVersion(), pe.getMinorVersion()),
+ e);
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new RuntimeException("unable to locate constructor for delegate", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new RuntimeException("error instantiating delegate", e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException("error accessing delegate", e);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new RuntimeException("error invoking delegate", e);
+ }
+ }
+
+ private void setVirtualHost(String virtualHost)
+ {
+ if (virtualHost != null && virtualHost.startsWith("/"))
+ {
+ virtualHost = virtualHost.substring(1);
+ }
+
+ _virtualHost = virtualHost;
+ }
+
+ public boolean attemptReconnection(String host, int port)
+ {
+ BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
+
+ _failoverPolicy.setBroker(bd);
+
+ try
+ {
+ makeBrokerConnection(bd);
+
+ return true;
+ }
+ catch (Exception e)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " + bd);
+ }
+
+ attemptReconnection();
+ }
+
+ return false;
+ }
+
+ public boolean attemptReconnection()
+ {
+ BrokerDetails broker = null;
+ while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
+ {
+ try
+ {
+ makeBrokerConnection(broker);
+ return true;
+ }
+ catch (Exception e)
+ {
+ if (!(e instanceof AMQException))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info(e.getMessage() + ":Unable to connect to broker at "
+ + _failoverPolicy.getCurrentBrokerDetails());
+ }
+ }
+ }
+ }
+
+ // connection unsuccessful
+ return false;
+ }
+
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ {
+ return _delegate.makeBrokerConnection(brokerDetail);
+ }
+
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ return _delegate.executeRetrySupport(operation);
+ }
+
+ /**
+ * Get the details of the currently active broker
+ *
+ * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
+ * otherwise
+ */
+ public BrokerDetails getActiveBrokerDetails()
+ {
+ return _failoverPolicy.getCurrentBrokerDetails();
+ }
+
+ public boolean failoverAllowed()
+ {
+ if (!_connected)
+ {
+ return false;
+ }
+ else
+ {
+ return _failoverPolicy.failoverAllowed();
+ }
+ }
+
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ return createSession(transacted, acknowledgeMode, _maxPrefetch);
+ }
+
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
+ throws JMSException
+ {
+ return createSession(transacted, acknowledgeMode, prefetch, prefetch);
+ }
+
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+ final int prefetchHigh, final int prefetchLow) throws JMSException
+ {
+ synchronized (_sessionCreationLock)
+ {
+ checkNotClosed();
+ return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
+ }
+ }
+
+ private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ throws AMQException, FailoverException
+ {
+
+ ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+ // TODO: Be aware of possible changes to parameter order as versions change.
+
+ _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
+
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
+
+ if (transacted)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Issuing TxSelect for " + channelId);
+ }
+
+ TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
+
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
+ }
+ }
+
+ public void setFailoverPolicy(FailoverPolicy policy)
+ {
+ _failoverPolicy = policy;
+ }
+
+ public FailoverPolicy getFailoverPolicy()
+ {
+ return _failoverPolicy;
+ }
+
+ /**
+ * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
+ * the JMS spec
+ *
+ * @param transacted
+ * @param acknowledgeMode
+ *
+ * @return QueueSession
+ *
+ * @throws JMSException
+ */
+ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode));
+ }
+
+ /**
+ * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
+ * the JMS spec
+ *
+ * @param transacted
+ * @param acknowledgeMode
+ *
+ * @return TopicSession
+ *
+ * @throws JMSException
+ */
+ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
+ }
+
+ public boolean channelLimitReached()
+ {
+ return _sessions.size() >= _maximumChannelCount;
+ }
+
+ public String getClientID() throws JMSException
+ {
+ checkNotClosed();
+
+ return _clientName;
+ }
+
+ public void setClientID(String clientID) throws JMSException
+ {
+ checkNotClosed();
+ // in AMQP it is not possible to change the client ID. If one is not specified
+ // upon connection construction, an id is generated automatically. Therefore
+ // we can always throw an exception.
+ if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
+ {
+ throw new IllegalStateException("Client name cannot be changed after being set");
+ }
+ else
+ {
+ _logger.info("Operation setClientID is ignored using ID: " + getClientID());
+ }
+ }
+
+ public ConnectionMetaData getMetaData() throws JMSException
+ {
+ checkNotClosed();
+
+ return _connectionMetaData;
+
+ }
+
+ public ExceptionListener getExceptionListener() throws JMSException
+ {
+ checkNotClosed();
+
+ return _exceptionListener;
+ }
+
+ public void setExceptionListener(ExceptionListener listener) throws JMSException
+ {
+ checkNotClosed();
+ _exceptionListener = listener;
+ }
+
+ /**
+ * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
+ * and is not thread safe (which is legal according to the JMS specification).
+ *
+ * @throws JMSException
+ */
+ public void start() throws JMSException
+ {
+ checkNotClosed();
+ if (!_started)
+ {
+ _started = true;
+ final Iterator it = _sessions.values().iterator();
+ while (it.hasNext())
+ {
+ final AMQSession s = (AMQSession) (it.next());
+ try
+ {
+ s.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ }
+ }
+
+ public void stop() throws JMSException
+ {
+ checkNotClosed();
+ if (_started)
+ {
+ for (Iterator i = _sessions.values().iterator(); i.hasNext();)
+ {
+ try
+ {
+ ((AMQSession) i.next()).stop();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ _started = false;
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ close(DEFAULT_TIMEOUT);
+ }
+
+ public void close(long timeout) throws JMSException
+ {
+ close(new ArrayList<AMQSession>(_sessions.values()), timeout);
+ }
+
+ public void close(List<AMQSession> sessions, long timeout) throws JMSException
+ {
+ if (!_closed.getAndSet(true))
+ {
+ _closing.set(true);
+ try{
+ doClose(sessions, timeout);
+ }finally{
+ _closing.set(false);
+ }
+ }
+ }
+
+ private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
+ {
+ synchronized (_sessionCreationLock)
+ {
+ if (!sessions.isEmpty())
+ {
+ AMQSession session = sessions.remove(0);
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doClose(sessions, timeout);
+ }
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
+ {
+ try
+ {
+ long startCloseTime = System.currentTimeMillis();
+
+ closeAllSessions(null, timeout, startCloseTime);
+
+ //This MUST occur after we have successfully closed all Channels/Sessions
+ _taskPool.shutdown();
+
+ if (!_taskPool.isTerminated())
+ {
+ try
+ {
+ // adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+ _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
+ }
+
+ // adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
+ _delegate.closeConnection(timeout);
+
+ //If the taskpool hasn't shutdown by now then give it shutdownNow.
+ // This will interupt any running tasks.
+ if (!_taskPool.isTerminated())
+ {
+ List<Runnable> tasks = _taskPool.shutdownNow();
+ for (Runnable r : tasks)
+ {
+ _logger.warn("Connection close forced taskpool to prevent execution:" + r);
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+ _logger.error("error:", e);
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+ }
+ }
+ }
+
+ private long adjustTimeout(long timeout, long startTime)
+ {
+ long now = System.currentTimeMillis();
+ timeout -= now - startTime;
+ if (timeout < 0)
+ {
+ timeout = 0;
+ }
+
+ return timeout;
+ }
+
+ /**
+ * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
+ * mark objects "visible" in userland as closed after failover or other significant event that impacts the
+ * connection. <p/> The caller must hold the failover mutex before calling this method.
+ */
+ private void markAllSessionsClosed()
+ {
+ final LinkedList sessionCopy = new LinkedList(_sessions.values());
+ final Iterator it = sessionCopy.iterator();
+ while (it.hasNext())
+ {
+ final AMQSession session = (AMQSession) it.next();
+
+ session.markClosed();
+ }
+
+ _sessions.clear();
+ }
+
+ /**
+ * Close all the sessions, either due to normal connection closure or due to an error occurring.
+ *
+ * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
+ * before calling this method.
+ */
+ private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException
+ {
+ final LinkedList sessionCopy = new LinkedList(_sessions.values());
+ final Iterator it = sessionCopy.iterator();
+ JMSException sessionException = null;
+ while (it.hasNext())
+ {
+ final AMQSession session = (AMQSession) it.next();
+ if (cause != null)
+ {
+ session.closed(cause);
+ }
+ else
+ {
+ try
+ {
+ if (starttime != -1)
+ {
+ timeout = adjustTimeout(timeout, starttime);
+ }
+
+ session.close(timeout);
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing session: " + e);
+ sessionException = e;
+ }
+ }
+ }
+
+ _sessions.clear();
+ if (sessionException != null)
+ {
+ throw sessionException;
+ }
+ }
+
+ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException
+ {
+ checkNotClosed();
+
+ return null;
+ }
+
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
+ {
+ checkNotClosed();
+
+ return null;
+ }
+
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
+ {
+ checkNotClosed();
+
+ return null;
+ }
+
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException
+ {
+ // TODO Auto-generated method stub
+ checkNotClosed();
+
+ return null;
+ }
+
+ public long getMaximumChannelCount() throws JMSException
+ {
+ checkNotClosed();
+
+ return _maximumChannelCount;
+ }
+
+ public void setConnectionListener(ConnectionListener listener)
+ {
+ _connectionListener = listener;
+ }
+
+ public ConnectionListener getConnectionListener()
+ {
+ return _connectionListener;
+ }
+
+ public void setMaximumChannelCount(long maximumChannelCount)
+ {
+ _maximumChannelCount = maximumChannelCount;
+ }
+
+ public void setMaximumFrameSize(long frameMax)
+ {
+ _maximumFrameSize = frameMax;
+ }
+
+ public long getMaximumFrameSize()
+ {
+ return _maximumFrameSize;
+ }
+
+ public ChannelToSessionMap getSessions()
+ {
+ return _sessions;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public void setUsername(String id)
+ {
+ _username = id;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public String getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ public boolean started()
+ {
+ return _started;
+ }
+
+ public void bytesSent(long writtenBytes)
+ {
+ if (_connectionListener != null)
+ {
+ _connectionListener.bytesSent(writtenBytes);
+ }
+ }
+
+ public void bytesReceived(long receivedBytes)
+ {
+ if (_connectionListener != null)
+ {
+ _connectionListener.bytesReceived(receivedBytes);
+ }
+ }
+
+ /**
+ * Fire the preFailover event to the registered connection listener (if any)
+ *
+ * @param redirect true if this is the result of a redirect request rather than a connection error
+ *
+ * @return true if no listener or listener does not veto change
+ */
+ public boolean firePreFailover(boolean redirect)
+ {
+ boolean proceed = true;
+ if (_connectionListener != null)
+ {
+ proceed = _connectionListener.preFailover(redirect);
+ }
+
+ return proceed;
+ }
+
+ /**
+ * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
+ * resubscription then all the sessions are closed.
+ *
+ * @return true if no listener or listener does not veto resubscription.
+ *
+ * @throws JMSException
+ */
+ public boolean firePreResubscribe() throws JMSException
+ {
+ if (_connectionListener != null)
+ {
+ boolean resubscribe = _connectionListener.preResubscribe();
+ if (!resubscribe)
+ {
+ markAllSessionsClosed();
+ }
+
+ return resubscribe;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /** Fires a failover complete event to the registered connection listener (if any). */
+ public void fireFailoverComplete()
+ {
+ if (_connectionListener != null)
+ {
+ _connectionListener.failoverComplete();
+ }
+ }
+
+ /**
+ * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
+ * "failover mutex" must be held when doing any operations that could be corrupted during failover.
+ *
+ * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
+ */
+ public final Object getFailoverMutex()
+ {
+ return _failoverMutex;
+ }
+
+ public void failoverPrep()
+ {
+ _delegate.failoverPrep();
+ }
+
+ public void resubscribeSessions() throws JMSException, AMQException, FailoverException
+ {
+ _delegate.resubscribeSessions();
+ }
+
+ /**
+ * If failover is taking place this will block until it has completed. If failover is not taking place it will
+ * return immediately.
+ *
+ * @throws InterruptedException
+ */
+ public void blockUntilNotFailingOver() throws InterruptedException
+ {
+ _protocolHandler.blockUntilNotFailingOver();
+ }
+
+ /**
+ * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
+ * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
+ * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
+ *
+ * @param cause the exception
+ */
+ public void exceptionReceived(Throwable cause)
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
+ }
+
+ final JMSException je;
+ if (cause instanceof JMSException)
+ {
+ je = (JMSException) cause;
+ }
+ else
+ {
+ AMQConstant code = null;
+
+ if (cause instanceof AMQException)
+ {
+ code = ((AMQException) cause).getErrorCode();
+ }
+
+ if (code != null)
+ {
+ je = new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + cause);
+ }
+ else
+ {
+ //Should never get here as all AMQEs are required to have an ErrorCode!
+ // Other than AMQDisconnectedEx!
+
+ if (cause instanceof AMQDisconnectedException)
+ {
+ Exception last = _protocolHandler.getStateManager().getLastException();
+ if (last != null)
+ {
+ _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception");
+ cause = last;
+ }
+ }
+ je = new JMSException("Exception thrown against " + toString() + ": " + cause);
+ }
+
+ if (cause instanceof Exception)
+ {
+ je.setLinkedException((Exception) cause);
+ }
+
+ je.initCause(cause);
+ }
+
+ boolean closer = false;
+
+ // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
+ // so that any generic client code that tries to close the connection will not mess up this error
+ // handling sequence
+ if (cause instanceof IOException || cause instanceof AMQDisconnectedException)
+ {
+ // If we have an IOE/AMQDisconnect there is no connection to close on.
+ _closing.set(false);
+ closer = !_closed.getAndSet(true);
+
+ _protocolHandler.getProtocolSession().notifyError(je);
+ }
+
+ // get the failover mutex before trying to close
+ synchronized (getFailoverMutex())
+ {
+ // decide if we are going to close the session
+ if (hardError(cause))
+ {
+ closer = (!_closed.getAndSet(true)) || closer;
+ {
+ _logger.info("Closing AMQConnection due to :" + cause);
+ }
+ }
+ else
+ {
+ _logger.info("Not a hard-error connection not closing: " + cause);
+ }
+
+ // deliver the exception if there is a listener
+ if (_exceptionListener != null)
+ {
+ _exceptionListener.onException(je);
+ }
+ else
+ {
+ _logger.error("Throwable Received but no listener set: " + cause);
+ }
+
+ // if we are closing the connection, close sessions first
+ if (closer)
+ {
+ try
+ {
+ closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing all sessions: " + e, e);
+ }
+ }
+ }
+ }
+
+ private boolean hardError(Throwable cause)
+ {
+ if (cause instanceof AMQException)
+ {
+ return ((AMQException) cause).isHardError();
+ }
+
+ return true;
+ }
+
+ void registerSession(int channelId, AMQSession session)
+ {
+ _sessions.put(channelId, session);
+ }
+
+ public void deregisterSession(int channelId)
+ {
+ _sessions.remove(channelId);
+ }
+
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer("AMQConnection:\n");
+ if (_failoverPolicy.getCurrentBrokerDetails() == null)
+ {
+ buf.append("No active broker connection");
+ }
+ else
+ {
+ BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
+ buf.append("Host: ").append(String.valueOf(bd.getHost()));
+ buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
+ }
+
+ buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
+ buf.append("\nClient ID: ").append(String.valueOf(_clientName));
+ buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());
+
+ return buf.toString();
+ }
+
+ /**
+ * Returns connection url.
+ * @return connection url
+ */
+ public ConnectionURL getConnectionURL()
+ {
+ return _connectionURL;
+ }
+
+ /**
+ * Returns stringified connection url. This url is suitable only for display
+ * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
+ * @return connection url
+ */
+ public String toURL()
+ {
+ return _connectionURL.toString();
+ }
+
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()),
+ AMQConnectionFactory.class.getName(), null); // factory location
+ }
+
+ public SSLConfiguration getSSLConfiguration()
+ {
+ return _sslConfiguration;
+ }
+
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _defaultTopicExchangeName;
+ }
+
+ public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
+ {
+ _defaultTopicExchangeName = defaultTopicExchangeName;
+ }
+
+ public AMQShortString getDefaultQueueExchangeName()
+ {
+ return _defaultQueueExchangeName;
+ }
+
+ public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
+ {
+ _defaultQueueExchangeName = defaultQueueExchangeName;
+ }
+
+ public AMQShortString getTemporaryTopicExchangeName()
+ {
+ return _temporaryTopicExchangeName;
+ }
+
+ public AMQShortString getTemporaryQueueExchangeName()
+ {
+ return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
+ }
+
+ public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
+ {
+ _temporaryTopicExchangeName = temporaryTopicExchangeName;
+ }
+
+ public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName)
+ {
+ _temporaryQueueExchangeName = temporaryQueueExchangeName;
+ }
+
+ public void performConnectionTask(Runnable task)
+ {
+ _taskPool.execute(task);
+ }
+
+ public AMQSession getSession(int channelId)
+ {
+ return _sessions.get(channelId);
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _delegate.getProtocolVersion();
+ }
+
+ public boolean isFailingOver()
+ {
+ return (_protocolHandler.getFailoverLatch() != null);
+ }
+
+ /**
+ * Get the maximum number of messages that this connection can pre-fetch.
+ *
+ * @return The maximum number of messages that this connection can pre-fetch.
+ */
+ public long getMaxPrefetch()
+ {
+ return _maxPrefetch;
+ }
+
+ /**
+ * Indicates whether persistent messages are synchronized
+ *
+ * @return true if persistent messages are synchronized false otherwise
+ */
+ public boolean getSyncPersistence()
+ {
+ return _syncPersistence;
+ }
+
+ /**
+ * Indicates whether we need to sync on every message ack
+ */
+ public boolean getSyncAck()
+ {
+ return _syncAck;
+ }
+
+ public String getSyncPublish()
+ {
+ return _syncPublish;
+ }
+
+ public int getNextChannelID()
+ {
+ return _sessions.getNextChannelId();
+ }
+
+ public boolean isUseLegacyMapMessageFormat()
+ {
+ return _useLegacyMapMessageFormat;
+ }
+}