diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 272 |
1 files changed, 213 insertions, 59 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 27294562e5..0b9be5951f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,24 +20,21 @@ */ package org.apache.qpid.client; -import org.apache.qpid.AMQConnectionFailureException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQProtocolException; -import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.configuration.ClientProperties; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.*; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.Connection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.jms.FailoverPolicy; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.URLSyntaxException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -56,17 +53,34 @@ import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.net.ConnectException; -import java.net.UnknownHostException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQProtocolException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.client.configuration.ClientProperties; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.TxSelectBody; +import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.Connection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { @@ -76,6 +90,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); private int _size = 0; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private AtomicInteger _idFactory = new AtomicInteger(0); + private int _maxChannelID; + private boolean _cycledIds; public AMQSession get(int channelId) { @@ -165,11 +182,57 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _fastAccessSessions[i] = null; } } + + /* + * Synchronized on whole method so that we don't need to consider the + * increment-then-reset path in too much detail + */ + public synchronized int getNextChannelId() + { + int id = 0; + if (!_cycledIds) + { + id = _idFactory.incrementAndGet(); + if (id == _maxChannelID) + { + _cycledIds = true; + _idFactory.set(0); // Go back to the start + } + } + else + { + boolean done = false; + while (!done) + { + // Needs to work second time through + id = _idFactory.incrementAndGet(); + if (id > _maxChannelID) + { + _idFactory.set(0); + id = _idFactory.incrementAndGet(); + } + if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) + { + done = (_fastAccessSessions[id] == null); + } + else + { + done = (!_slowAccessSessions.keySet().contains(id)); + } + } + } + + return id; + } + + public void setMaxChannelID(int maxChannelID) + { + _maxChannelID = maxChannelID; + } } private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); - protected AtomicInteger _idFactory = new AtomicInteger(0); /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be @@ -245,16 +308,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; - private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this protected AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages - private long _maxPrefetch; + private int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; + //Indicates whether we need to sync on every message ack + private boolean _syncAck; + + //Indicates the sync publish options (persistent|all) + //By default it's async publish + private String _syncPublish = ""; + /** * @param broker brokerdetails * @param username username @@ -335,37 +404,76 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { // set this connection maxPrefetch - if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) { - _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); } else { // use the defaul value set for all connections - _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, - ClientProperties.MAX_PREFETCH_DEFAULT)); + _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, + ClientProperties.MAX_PREFETCH_DEFAULT)); } - if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { - _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE)); + _syncPersistence = + Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); } else { // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); + if (_syncPersistence) + { + _logger.warn("sync_persistence is a deprecated property, " + + "please use sync_publish={persistent|all} instead"); + } } - _failoverPolicy = new FailoverPolicy(connectionURL); - BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails(); - if (brokerDetails.getTransport().equals(BrokerDetails.VM)) + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) + { + _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); + } + else + { + // use the defaul value set for all connections + _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); + } + + if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) + { + _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); + } + else + { + // use the default value set for all connections + _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); + } + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); + + _failoverPolicy = new FailoverPolicy(connectionURL, this); + BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); + if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); + } + else if ("0-9".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_0_9(this); + } + else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion)) + { + _delegate = new AMQConnectionDelegate_9_1(this); } else { _delegate = new AMQConnectionDelegate_0_10(this); } + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); if (_logger.isInfoEnabled()) { @@ -413,7 +521,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect boolean retryAllowed = true; Exception connectionException = null; - while (!_connected && retryAllowed) + while (!_connected && retryAllowed && brokerDetails != null) { ProtocolVersion pe = null; try @@ -439,7 +547,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else if (!_connected) { - retryAllowed = _failoverPolicy.failoverAllowed(); + retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } @@ -458,7 +566,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (connectionException.getCause() != null) { message = connectionException.getCause().getMessage(); - connectionException.getCause().printStackTrace(); } else { @@ -518,6 +625,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); } catch (ClassNotFoundException e) { @@ -591,12 +699,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean attemptReconnection() { - while (_failoverPolicy.failoverAllowed()) + BrokerDetails broker = null; + while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { try { - makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); - + makeBrokerConnection(broker); return true; } catch (Exception e) @@ -628,6 +736,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _delegate.makeBrokerConnection(brokerDetail); } + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + return _delegate.executeRetrySupport(operation); + } + /** * Get the details of the currently active broker * @@ -653,7 +766,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { - return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); + return createSession(transacted, acknowledgeMode, _maxPrefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) @@ -871,7 +984,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (!_closed.getAndSet(true)) { - doClose(sessions, timeout); + _closing.set(true); + try{ + doClose(sessions, timeout); + }finally{ + _closing.set(false); + } } } @@ -917,7 +1035,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); - _delegate.closeConneciton(timeout); + _delegate.closeConnection(timeout); //If the taskpool hasn't shutdown by now then give it shutdownNow. // This will interupt any running tasks. @@ -932,6 +1050,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (AMQException e) { + _logger.error("error:", e); JMSException jmse = new JMSException("Error closing connection: " + e); jmse.setLinkedException(e); throw jmse; @@ -1191,6 +1310,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } + public void failoverPrep() + { + _delegate.failoverPrep(); + } + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); @@ -1245,6 +1369,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect else { //Should never get here as all AMQEs are required to have an ErrorCode! + // Other than AMQDisconnectedEx! + + if (cause instanceof AMQDisconnectedException) + { + Exception last = _protocolHandler.getStateManager().getLastException(); + if (last != null) + { + _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception"); + cause = last; + } + } je = new JMSException("Exception thrown against " + toString() + ": " + cause); } @@ -1259,8 +1394,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // in the case of an IOException, MINA has closed the protocol session so we set _closed to true // so that any generic client code that tries to close the connection will not mess up this error // handling sequence - if (cause instanceof IOException) + if (cause instanceof IOException || cause instanceof AMQDisconnectedException) { + // If we have an IOE/AMQDisconnect there is no connection to close on. + _closing.set(false); closer = !_closed.getAndSet(true); _protocolHandler.getProtocolSession().notifyError(je); @@ -1317,7 +1454,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _sessions.put(channelId, session); } - void deregisterSession(int channelId) + public void deregisterSession(int channelId) { _sessions.remove(channelId); } @@ -1411,13 +1548,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ProtocolVersion getProtocolVersion() { - return _protocolVersion; - } - - public void setProtocolVersion(ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion); + return _delegate.getProtocolVersion(); } public boolean isFailingOver() @@ -1444,4 +1575,27 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } + + /** + * Indicates whether we need to sync on every message ack + */ + public boolean getSyncAck() + { + return _syncAck; + } + + public String getSyncPublish() + { + return _syncPublish; + } + + public void setIdleTimeout(long l) + { + _delegate.setIdleTimeout(l); + } + + public int getNextChannelID() + { + return _sessions.getNextChannelId(); + } } |