summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java1493
1 files changed, 0 insertions, 1493 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
deleted file mode 100644
index ab59fee020..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ /dev/null
@@ -1,1493 +0,0 @@
-/*
-*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
-import java.net.UnknownHostException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
-{
- private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
-
-
- /**
- * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
- * held by any child objects of this connection such as the session, producers and consumers.
- */
- private final Object _failoverMutex = new Object();
-
- private final Object _sessionCreationLock = new Object();
-
- /**
- * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
- * and we must prevent the client from opening too many.
- */
- private long _maximumChannelCount;
-
- /** The maximum size of frame supported by the server */
- private long _maximumFrameSize;
-
- /**
- * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
- * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
- * handler.
- */
- protected AMQProtocolHandler _protocolHandler;
-
- /** Maps from session id (Integer) to AMQSession instance */
- private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
-
- private String _clientName;
-
- /** The user name to use for authentication */
- private String _username;
-
- /** The password to use for authentication */
- private String _password;
-
- /** The virtual path to connect to on the AMQ server */
- private String _virtualHost;
-
- protected ExceptionListener _exceptionListener;
-
- private ConnectionListener _connectionListener;
-
- private ConnectionURL _connectionURL;
-
- /**
- * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
- * publication.
- */
- protected volatile boolean _started;
-
- /** Policy dictating how to failover */
- protected FailoverPolicy _failoverPolicy;
-
- /*
- * _Connected should be refactored with a suitable wait object.
- */
- protected boolean _connected;
-
- /*
- * The connection meta data
- */
- private QpidConnectionMetaData _connectionMetaData;
-
- /** Configuration info for SSL */
- private SSLConfiguration _sslConfiguration;
-
- private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
-
- /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
- private final ExecutorService _taskPool = Executors.newCachedThreadPool();
- private static final long DEFAULT_TIMEOUT = 1000 * 30;
-
- protected AMQConnectionDelegate _delegate;
-
- // this connection maximum number of prefetched messages
- private int _maxPrefetch;
-
- //Indicates whether persistent messages are synchronized
- private boolean _syncPersistence;
-
- //Indicates whether we need to sync on every message ack
- private boolean _syncAck;
-
- //Indicates the sync publish options (persistent|all)
- //By default it's async publish
- private String _syncPublish = "";
-
- // Indicates whether to use the old map message format or the
- // new amqp-0-10 encoded format.
- private boolean _useLegacyMapMessageFormat;
-
- /**
- * @param broker brokerdetails
- * @param username username
- * @param password password
- * @param clientName clientid
- * @param virtualHost virtualhost
- *
- * @throws AMQException
- * @throws URLSyntaxException
- */
- public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
- throws AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(
- ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), null);
- }
-
- /**
- * @param broker brokerdetails
- * @param username username
- * @param password password
- * @param clientName clientid
- * @param virtualHost virtualhost
- *
- * @throws AMQException
- * @throws URLSyntaxException
- */
- public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
- SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(
- ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
- }
-
- public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
- throws AMQException, URLSyntaxException
- {
- this(host, port, false, username, password, clientName, virtualHost, null);
- }
-
- public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
- SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
- {
- this(host, port, false, username, password, clientName, virtualHost, sslConfig);
- }
-
- public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
- String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(
- useSSL
- ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'")
- : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig);
- }
-
- public AMQConnection(String connection) throws AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(connection), null);
- }
-
- public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(connection), sslConfig);
- }
-
- /**
- * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
- * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
- */
- public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
- {
- // set this connection maxPrefetch
- if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
- {
- _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
- }
- else
- {
- // use the defaul value set for all connections
- _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
- }
-
- if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
- {
- _syncPersistence =
- Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
- _logger.warn("sync_persistence is a deprecated property, " +
- "please use sync_publish={persistent|all} instead");
- }
- else
- {
- // use the defaul value set for all connections
- _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
- if (_syncPersistence)
- {
- _logger.warn("sync_persistence is a deprecated property, " +
- "please use sync_publish={persistent|all} instead");
- }
- }
-
- if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
- {
- _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
- }
- else
- {
- // use the defaul value set for all connections
- _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
- }
-
- if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
- {
- _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
- }
- else
- {
- // use the default value set for all connections
- _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
- }
-
- if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
- {
- _useLegacyMapMessageFormat = Boolean.parseBoolean(
- connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT));
- }
- else
- {
- // use the default value set for all connections
- _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
- }
-
- String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
- _logger.debug("AMQP version " + amqpVersion);
-
- _failoverPolicy = new FailoverPolicy(connectionURL, this);
- BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion))
- {
- _delegate = new AMQConnectionDelegate_8_0(this);
- }
- else if ("0-9".equals(amqpVersion))
- {
- _delegate = new AMQConnectionDelegate_0_9(this);
- }
- else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion))
- {
- _delegate = new AMQConnectionDelegate_9_1(this);
- }
- else
- {
- _delegate = new AMQConnectionDelegate_0_10(this);
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Connection:" + connectionURL);
- }
-
- _sslConfiguration = sslConfig;
- if (connectionURL == null)
- {
- throw new IllegalArgumentException("Connection must be specified");
- }
-
- _connectionURL = connectionURL;
-
- _clientName = connectionURL.getClientName();
- _username = connectionURL.getUsername();
- _password = connectionURL.getPassword();
-
- setVirtualHost(connectionURL.getVirtualHost());
-
- if (connectionURL.getDefaultQueueExchangeName() != null)
- {
- _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
- }
-
- if (connectionURL.getDefaultTopicExchangeName() != null)
- {
- _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
- }
-
- if (connectionURL.getTemporaryQueueExchangeName() != null)
- {
- _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
- }
-
- if (connectionURL.getTemporaryTopicExchangeName() != null)
- {
- _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
- }
-
- _protocolHandler = new AMQProtocolHandler(this);
-
- _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
-
- // We are not currently connected
- _connected = false;
-
- boolean retryAllowed = true;
- Exception connectionException = null;
- while (!_connected && retryAllowed && brokerDetails != null)
- {
- ProtocolVersion pe = null;
- try
- {
- pe = makeBrokerConnection(brokerDetails);
- }
- catch (Exception e)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Unable to connect to broker at " +
- _failoverPolicy.getCurrentBrokerDetails(),
- e);
- }
- connectionException = e;
- }
-
- if (pe != null)
- {
- // reset the delegate to the version returned by the
- // broker
- initDelegate(pe);
- }
- else if (!_connected)
- {
- retryAllowed = _failoverPolicy.failoverAllowed();
- brokerDetails = _failoverPolicy.getNextBrokerDetails();
- }
- }
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Are we connected:" + _connected);
- }
-
- if (!_connected)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
- }
-
- String message = null;
-
- if (connectionException != null)
- {
- if (connectionException.getCause() != null)
- {
- message = connectionException.getCause().getMessage();
- }
- else
- {
- message = connectionException.getMessage();
- }
- }
-
- if ((message == null) || message.equals(""))
- {
- if (message == null)
- {
- message = "Unable to Connect";
- }
- else // can only be "" if getMessage() returned it therfore lastException != null
- {
- message = "Unable to Connect:" + connectionException.getClass();
- }
- }
-
- for (Throwable th = connectionException; th != null; th = th.getCause())
- {
- if (th instanceof UnresolvedAddressException ||
- th instanceof UnknownHostException)
- {
- throw new AMQUnresolvedAddressException
- (message,
- _failoverPolicy.getCurrentBrokerDetails().toString(),
- connectionException);
- }
- }
-
- throw new AMQConnectionFailureException(message, connectionException);
- }
-
- _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
-
- _sessions.setMaxChannelID(_delegate.getMaxChannelID());
- _sessions.setMinChannelID(_delegate.getMinChannelID());
-
- _connectionMetaData = new QpidConnectionMetaData(this);
- }
-
- protected boolean checkException(Throwable thrown)
- {
- Throwable cause = thrown.getCause();
-
- if (cause == null)
- {
- cause = thrown;
- }
-
- return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
- }
-
- private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
- {
- try
- {
- String delegateClassName = String.format
- ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
- pe.getMajorVersion(), pe.getMinorVersion());
- _logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe);
- Class c = Class.forName(delegateClassName);
- Class partypes[] = new Class[1];
- partypes[0] = AMQConnection.class;
- _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
- //Update our session to use this new protocol version
- _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
-
- }
- catch (ClassNotFoundException e)
- {
- throw new AMQProtocolException
- (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
- String.format("Protocol: %s.%s is rquired by the broker but is not " +
- "currently supported by this client library implementation",
- pe.getMajorVersion(), pe.getMinorVersion()),
- e);
- }
- catch (NoSuchMethodException e)
- {
- throw new RuntimeException("unable to locate constructor for delegate", e);
- }
- catch (InstantiationException e)
- {
- throw new RuntimeException("error instantiating delegate", e);
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException("error accessing delegate", e);
- }
- catch (InvocationTargetException e)
- {
- throw new RuntimeException("error invoking delegate", e);
- }
- }
-
- protected AMQConnection(String username, String password, String clientName, String virtualHost)
- {
- _clientName = clientName;
- _username = username;
- _password = password;
- setVirtualHost(virtualHost);
- }
-
- private void setVirtualHost(String virtualHost)
- {
- if (virtualHost != null && virtualHost.startsWith("/"))
- {
- virtualHost = virtualHost.substring(1);
- }
-
- _virtualHost = virtualHost;
- }
-
- public boolean attemptReconnection(String host, int port)
- {
- BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
-
- _failoverPolicy.setBroker(bd);
-
- try
- {
- makeBrokerConnection(bd);
-
- return true;
- }
- catch (Exception e)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Unable to connect to broker at " + bd);
- }
-
- attemptReconnection();
- }
-
- return false;
- }
-
- public boolean attemptReconnection()
- {
- BrokerDetails broker = null;
- while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
- {
- try
- {
- makeBrokerConnection(broker);
- return true;
- }
- catch (Exception e)
- {
- if (!(e instanceof AMQException))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
- }
- }
- else
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info(e.getMessage() + ":Unable to connect to broker at "
- + _failoverPolicy.getCurrentBrokerDetails());
- }
- }
- }
- }
-
- // connection unsuccessful
- return false;
- }
-
- public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
- {
- return _delegate.makeBrokerConnection(brokerDetail);
- }
-
- public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
- {
- return _delegate.executeRetrySupport(operation);
- }
-
- /**
- * Get the details of the currently active broker
- *
- * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
- * otherwise
- */
- public BrokerDetails getActiveBrokerDetails()
- {
- return _failoverPolicy.getCurrentBrokerDetails();
- }
-
- public boolean failoverAllowed()
- {
- if (!_connected)
- {
- return false;
- }
- else
- {
- return _failoverPolicy.failoverAllowed();
- }
- }
-
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
- {
- return createSession(transacted, acknowledgeMode, _maxPrefetch);
- }
-
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
- throws JMSException
- {
- return createSession(transacted, acknowledgeMode, prefetch, prefetch);
- }
-
- public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetchHigh, final int prefetchLow) throws JMSException
- {
- synchronized (_sessionCreationLock)
- {
- checkNotClosed();
- return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
- }
- }
-
- private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
-
- ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
-
- // TODO: Be aware of possible changes to parameter order as versions change.
-
- _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
-
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
-
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
-
- if (transacted)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Issuing TxSelect for " + channelId);
- }
-
- TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
-
- // TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
- }
- }
-
- private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- }
- catch (AMQException e)
- {
- deregisterSession(channelId);
- throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
- }
- }
-
- public void setFailoverPolicy(FailoverPolicy policy)
- {
- _failoverPolicy = policy;
- }
-
- public FailoverPolicy getFailoverPolicy()
- {
- return _failoverPolicy;
- }
-
- /**
- * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
- * the JMS spec
- *
- * @param transacted
- * @param acknowledgeMode
- *
- * @return QueueSession
- *
- * @throws JMSException
- */
- public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
- {
- return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode));
- }
-
- /**
- * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
- * the JMS spec
- *
- * @param transacted
- * @param acknowledgeMode
- *
- * @return TopicSession
- *
- * @throws JMSException
- */
- public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
- {
- return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
- }
-
- public boolean channelLimitReached()
- {
- return _sessions.size() >= _maximumChannelCount;
- }
-
- public String getClientID() throws JMSException
- {
- checkNotClosed();
-
- return _clientName;
- }
-
- public void setClientID(String clientID) throws JMSException
- {
- checkNotClosed();
- // in AMQP it is not possible to change the client ID. If one is not specified
- // upon connection construction, an id is generated automatically. Therefore
- // we can always throw an exception.
- if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
- {
- throw new IllegalStateException("Client name cannot be changed after being set");
- }
- else
- {
- _logger.info("Operation setClientID is ignored using ID: " + getClientID());
- }
- }
-
- public ConnectionMetaData getMetaData() throws JMSException
- {
- checkNotClosed();
-
- return _connectionMetaData;
-
- }
-
- public ExceptionListener getExceptionListener() throws JMSException
- {
- checkNotClosed();
-
- return _exceptionListener;
- }
-
- public void setExceptionListener(ExceptionListener listener) throws JMSException
- {
- checkNotClosed();
- _exceptionListener = listener;
- }
-
- /**
- * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
- * and is not thread safe (which is legal according to the JMS specification).
- *
- * @throws JMSException
- */
- public void start() throws JMSException
- {
- checkNotClosed();
- if (!_started)
- {
- _started = true;
- final Iterator it = _sessions.values().iterator();
- while (it.hasNext())
- {
- final AMQSession s = (AMQSession) (it.next());
- try
- {
- s.start();
- }
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
- }
-
- }
- }
-
- public void stop() throws JMSException
- {
- checkNotClosed();
- if (_started)
- {
- for (Iterator i = _sessions.values().iterator(); i.hasNext();)
- {
- try
- {
- ((AMQSession) i.next()).stop();
- }
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
- }
-
- _started = false;
- }
- }
-
- public void close() throws JMSException
- {
- close(DEFAULT_TIMEOUT);
- }
-
- public void close(long timeout) throws JMSException
- {
- close(new ArrayList<AMQSession>(_sessions.values()), timeout);
- }
-
- public void close(List<AMQSession> sessions, long timeout) throws JMSException
- {
- if (!_closed.getAndSet(true))
- {
- _closing.set(true);
- try{
- doClose(sessions, timeout);
- }finally{
- _closing.set(false);
- }
- }
- }
-
- private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
- {
- synchronized (_sessionCreationLock)
- {
- if (!sessions.isEmpty())
- {
- AMQSession session = sessions.remove(0);
- synchronized (session.getMessageDeliveryLock())
- {
- doClose(sessions, timeout);
- }
- }
- else
- {
- synchronized (getFailoverMutex())
- {
- try
- {
- long startCloseTime = System.currentTimeMillis();
-
- closeAllSessions(null, timeout, startCloseTime);
-
- //This MUST occur after we have successfully closed all Channels/Sessions
- _taskPool.shutdown();
-
- if (!_taskPool.isTerminated())
- {
- try
- {
- // adjust timeout
- long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
-
- _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- _logger.info("Interrupted while shutting down connection thread pool.");
- }
- }
-
- // adjust timeout
- timeout = adjustTimeout(timeout, startCloseTime);
- _delegate.closeConnection(timeout);
-
- //If the taskpool hasn't shutdown by now then give it shutdownNow.
- // This will interupt any running tasks.
- if (!_taskPool.isTerminated())
- {
- List<Runnable> tasks = _taskPool.shutdownNow();
- for (Runnable r : tasks)
- {
- _logger.warn("Connection close forced taskpool to prevent execution:" + r);
- }
- }
- }
- catch (AMQException e)
- {
- _logger.error("error:", e);
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- }
- }
- }
- }
-
- private long adjustTimeout(long timeout, long startTime)
- {
- long now = System.currentTimeMillis();
- timeout -= now - startTime;
- if (timeout < 0)
- {
- timeout = 0;
- }
-
- return timeout;
- }
-
- /**
- * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
- * mark objects "visible" in userland as closed after failover or other significant event that impacts the
- * connection. <p/> The caller must hold the failover mutex before calling this method.
- */
- private void markAllSessionsClosed()
- {
- final LinkedList sessionCopy = new LinkedList(_sessions.values());
- final Iterator it = sessionCopy.iterator();
- while (it.hasNext())
- {
- final AMQSession session = (AMQSession) it.next();
-
- session.markClosed();
- }
-
- _sessions.clear();
- }
-
- /**
- * Close all the sessions, either due to normal connection closure or due to an error occurring.
- *
- * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
- * before calling this method.
- */
- private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException
- {
- final LinkedList sessionCopy = new LinkedList(_sessions.values());
- final Iterator it = sessionCopy.iterator();
- JMSException sessionException = null;
- while (it.hasNext())
- {
- final AMQSession session = (AMQSession) it.next();
- if (cause != null)
- {
- session.closed(cause);
- }
- else
- {
- try
- {
- if (starttime != -1)
- {
- timeout = adjustTimeout(timeout, starttime);
- }
-
- session.close(timeout);
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e);
- sessionException = e;
- }
- }
- }
-
- _sessions.clear();
- if (sessionException != null)
- {
- throw sessionException;
- }
- }
-
- public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws JMSException
- {
- checkNotClosed();
-
- return null;
- }
-
- public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
- {
- checkNotClosed();
-
- return null;
- }
-
- public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
- int maxMessages) throws JMSException
- {
- checkNotClosed();
-
- return null;
- }
-
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws JMSException
- {
- // TODO Auto-generated method stub
- checkNotClosed();
-
- return null;
- }
-
- public long getMaximumChannelCount() throws JMSException
- {
- checkNotClosed();
-
- return _maximumChannelCount;
- }
-
- public void setConnectionListener(ConnectionListener listener)
- {
- _connectionListener = listener;
- }
-
- public ConnectionListener getConnectionListener()
- {
- return _connectionListener;
- }
-
- public void setMaximumChannelCount(long maximumChannelCount)
- {
- _maximumChannelCount = maximumChannelCount;
- }
-
- public void setMaximumFrameSize(long frameMax)
- {
- _maximumFrameSize = frameMax;
- }
-
- public long getMaximumFrameSize()
- {
- return _maximumFrameSize;
- }
-
- public ChannelToSessionMap getSessions()
- {
- return _sessions;
- }
-
- public String getUsername()
- {
- return _username;
- }
-
- public void setUsername(String id)
- {
- _username = id;
- }
-
- public String getPassword()
- {
- return _password;
- }
-
- public String getVirtualHost()
- {
- return _virtualHost;
- }
-
- public AMQProtocolHandler getProtocolHandler()
- {
- return _protocolHandler;
- }
-
- public boolean started()
- {
- return _started;
- }
-
- public void bytesSent(long writtenBytes)
- {
- if (_connectionListener != null)
- {
- _connectionListener.bytesSent(writtenBytes);
- }
- }
-
- public void bytesReceived(long receivedBytes)
- {
- if (_connectionListener != null)
- {
- _connectionListener.bytesReceived(receivedBytes);
- }
- }
-
- /**
- * Fire the preFailover event to the registered connection listener (if any)
- *
- * @param redirect true if this is the result of a redirect request rather than a connection error
- *
- * @return true if no listener or listener does not veto change
- */
- public boolean firePreFailover(boolean redirect)
- {
- boolean proceed = true;
- if (_connectionListener != null)
- {
- proceed = _connectionListener.preFailover(redirect);
- }
-
- return proceed;
- }
-
- /**
- * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
- * resubscription then all the sessions are closed.
- *
- * @return true if no listener or listener does not veto resubscription.
- *
- * @throws JMSException
- */
- public boolean firePreResubscribe() throws JMSException
- {
- if (_connectionListener != null)
- {
- boolean resubscribe = _connectionListener.preResubscribe();
- if (!resubscribe)
- {
- markAllSessionsClosed();
- }
-
- return resubscribe;
- }
- else
- {
- return true;
- }
- }
-
- /** Fires a failover complete event to the registered connection listener (if any). */
- public void fireFailoverComplete()
- {
- if (_connectionListener != null)
- {
- _connectionListener.failoverComplete();
- }
- }
-
- /**
- * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
- * "failover mutex" must be held when doing any operations that could be corrupted during failover.
- *
- * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
- */
- public final Object getFailoverMutex()
- {
- return _failoverMutex;
- }
-
- public void failoverPrep()
- {
- _delegate.failoverPrep();
- }
-
- public void resubscribeSessions() throws JMSException, AMQException, FailoverException
- {
- _delegate.resubscribeSessions();
- }
-
- /**
- * If failover is taking place this will block until it has completed. If failover is not taking place it will
- * return immediately.
- *
- * @throws InterruptedException
- */
- public void blockUntilNotFailingOver() throws InterruptedException
- {
- _protocolHandler.blockUntilNotFailingOver();
- }
-
- /**
- * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
- * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
- * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
- *
- * @param cause the exception
- */
- public void exceptionReceived(Throwable cause)
- {
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
- }
-
- final JMSException je;
- if (cause instanceof JMSException)
- {
- je = (JMSException) cause;
- }
- else
- {
- AMQConstant code = null;
-
- if (cause instanceof AMQException)
- {
- code = ((AMQException) cause).getErrorCode();
- }
-
- if (code != null)
- {
- je = new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + cause);
- }
- else
- {
- //Should never get here as all AMQEs are required to have an ErrorCode!
- // Other than AMQDisconnectedEx!
-
- if (cause instanceof AMQDisconnectedException)
- {
- Exception last = _protocolHandler.getStateManager().getLastException();
- if (last != null)
- {
- _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception");
- cause = last;
- }
- }
- je = new JMSException("Exception thrown against " + toString() + ": " + cause);
- }
-
- if (cause instanceof Exception)
- {
- je.setLinkedException((Exception) cause);
- }
-
- je.initCause(cause);
- }
-
- boolean closer = false;
-
- // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
- // so that any generic client code that tries to close the connection will not mess up this error
- // handling sequence
- if (cause instanceof IOException || cause instanceof AMQDisconnectedException)
- {
- // If we have an IOE/AMQDisconnect there is no connection to close on.
- _closing.set(false);
- closer = !_closed.getAndSet(true);
-
- _protocolHandler.getProtocolSession().notifyError(je);
- }
-
- // get the failover mutex before trying to close
- synchronized (getFailoverMutex())
- {
- // decide if we are going to close the session
- if (hardError(cause))
- {
- closer = (!_closed.getAndSet(true)) || closer;
- {
- _logger.info("Closing AMQConnection due to :" + cause);
- }
- }
- else
- {
- _logger.info("Not a hard-error connection not closing: " + cause);
- }
-
- // deliver the exception if there is a listener
- if (_exceptionListener != null)
- {
- _exceptionListener.onException(je);
- }
- else
- {
- _logger.error("Throwable Received but no listener set: " + cause);
- }
-
- // if we are closing the connection, close sessions first
- if (closer)
- {
- try
- {
- closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
- }
- catch (JMSException e)
- {
- _logger.error("Error closing all sessions: " + e, e);
- }
- }
- }
- }
-
- private boolean hardError(Throwable cause)
- {
- if (cause instanceof AMQException)
- {
- return ((AMQException) cause).isHardError();
- }
-
- return true;
- }
-
- void registerSession(int channelId, AMQSession session)
- {
- _sessions.put(channelId, session);
- }
-
- public void deregisterSession(int channelId)
- {
- _sessions.remove(channelId);
- }
-
- public String toString()
- {
- StringBuffer buf = new StringBuffer("AMQConnection:\n");
- if (_failoverPolicy.getCurrentBrokerDetails() == null)
- {
- buf.append("No active broker connection");
- }
- else
- {
- BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
- buf.append("Host: ").append(String.valueOf(bd.getHost()));
- buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
- }
-
- buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
- buf.append("\nClient ID: ").append(String.valueOf(_clientName));
- buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());
-
- return buf.toString();
- }
-
- public String toURL()
- {
- return _connectionURL.toString();
- }
-
- public Reference getReference() throws NamingException
- {
- return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()),
- AMQConnectionFactory.class.getName(), null); // factory location
- }
-
- public SSLConfiguration getSSLConfiguration()
- {
- return _sslConfiguration;
- }
-
- public AMQShortString getDefaultTopicExchangeName()
- {
- return _defaultTopicExchangeName;
- }
-
- public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
- {
- _defaultTopicExchangeName = defaultTopicExchangeName;
- }
-
- public AMQShortString getDefaultQueueExchangeName()
- {
- return _defaultQueueExchangeName;
- }
-
- public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
- {
- _defaultQueueExchangeName = defaultQueueExchangeName;
- }
-
- public AMQShortString getTemporaryTopicExchangeName()
- {
- return _temporaryTopicExchangeName;
- }
-
- public AMQShortString getTemporaryQueueExchangeName()
- {
- return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
- }
-
- public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
- {
- _temporaryTopicExchangeName = temporaryTopicExchangeName;
- }
-
- public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName)
- {
- _temporaryQueueExchangeName = temporaryQueueExchangeName;
- }
-
- public void performConnectionTask(Runnable task)
- {
- _taskPool.execute(task);
- }
-
- public AMQSession getSession(int channelId)
- {
- return _sessions.get(channelId);
- }
-
- public ProtocolVersion getProtocolVersion()
- {
- return _delegate.getProtocolVersion();
- }
-
- public boolean isFailingOver()
- {
- return (_protocolHandler.getFailoverLatch() != null);
- }
-
- /**
- * Get the maximum number of messages that this connection can pre-fetch.
- *
- * @return The maximum number of messages that this connection can pre-fetch.
- */
- public long getMaxPrefetch()
- {
- return _maxPrefetch;
- }
-
- /**
- * Indicates whether persistent messages are synchronized
- *
- * @return true if persistent messages are synchronized false otherwise
- */
- public boolean getSyncPersistence()
- {
- return _syncPersistence;
- }
-
- /**
- * Indicates whether we need to sync on every message ack
- */
- public boolean getSyncAck()
- {
- return _syncAck;
- }
-
- public String getSyncPublish()
- {
- return _syncPublish;
- }
-
- public int getNextChannelID()
- {
- return _sessions.getNextChannelId();
- }
-
- public boolean isUseLegacyMapMessageFormat()
- {
- return _useLegacyMapMessageFormat;
- }
-}