summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java272
1 files changed, 213 insertions, 59 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 27294562e5..0b9be5951f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,24 +20,21 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
+import java.net.UnknownHostException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -56,17 +53,34 @@ import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
-import java.net.UnknownHostException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQProtocolException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.client.configuration.ClientProperties;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -76,6 +90,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
private int _size = 0;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private AtomicInteger _idFactory = new AtomicInteger(0);
+ private int _maxChannelID;
+ private boolean _cycledIds;
public AMQSession get(int channelId)
{
@@ -165,11 +182,57 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_fastAccessSessions[i] = null;
}
}
+
+ /*
+ * Synchronized on whole method so that we don't need to consider the
+ * increment-then-reset path in too much detail
+ */
+ public synchronized int getNextChannelId()
+ {
+ int id = 0;
+ if (!_cycledIds)
+ {
+ id = _idFactory.incrementAndGet();
+ if (id == _maxChannelID)
+ {
+ _cycledIds = true;
+ _idFactory.set(0); // Go back to the start
+ }
+ }
+ else
+ {
+ boolean done = false;
+ while (!done)
+ {
+ // Needs to work second time through
+ id = _idFactory.incrementAndGet();
+ if (id > _maxChannelID)
+ {
+ _idFactory.set(0);
+ id = _idFactory.incrementAndGet();
+ }
+ if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ done = (_fastAccessSessions[id] == null);
+ }
+ else
+ {
+ done = (!_slowAccessSessions.keySet().contains(id));
+ }
+ }
+ }
+
+ return id;
+ }
+
+ public void setMaxChannelID(int maxChannelID)
+ {
+ _maxChannelID = maxChannelID;
+ }
}
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
- protected AtomicInteger _idFactory = new AtomicInteger(0);
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
@@ -245,16 +308,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
- private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
protected AMQConnectionDelegate _delegate;
// this connection maximum number of prefetched messages
- private long _maxPrefetch;
+ private int _maxPrefetch;
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
+ //Indicates whether we need to sync on every message ack
+ private boolean _syncAck;
+
+ //Indicates the sync publish options (persistent|all)
+ //By default it's async publish
+ private String _syncPublish = "";
+
/**
* @param broker brokerdetails
* @param username username
@@ -335,37 +404,76 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
// set this connection maxPrefetch
- if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
- _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
- _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
+ _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
- if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
- _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
+ _syncPersistence =
+ Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
}
else
{
// use the defaul value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
+ if (_syncPersistence)
+ {
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
+ }
}
- _failoverPolicy = new FailoverPolicy(connectionURL);
- BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
- if (brokerDetails.getTransport().equals(BrokerDetails.VM))
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
+ {
+ _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
+ {
+ _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+ }
+ else
+ {
+ // use the default value set for all connections
+ _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
+ }
+
+ String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
+
+ _failoverPolicy = new FailoverPolicy(connectionURL, this);
+ BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
+ if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
+ }
+ else if ("0-9".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_0_9(this);
+ }
+ else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_9_1(this);
}
else
{
_delegate = new AMQConnectionDelegate_0_10(this);
}
+ _sessions.setMaxChannelID(_delegate.getMaxChannelID());
if (_logger.isInfoEnabled())
{
@@ -413,7 +521,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
boolean retryAllowed = true;
Exception connectionException = null;
- while (!_connected && retryAllowed)
+ while (!_connected && retryAllowed && brokerDetails != null)
{
ProtocolVersion pe = null;
try
@@ -439,7 +547,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else if (!_connected)
{
- retryAllowed = _failoverPolicy.failoverAllowed();
+ retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
@@ -458,7 +566,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (connectionException.getCause() != null)
{
message = connectionException.getCause().getMessage();
- connectionException.getCause().printStackTrace();
}
else
{
@@ -518,6 +625,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
+ _sessions.setMaxChannelID(_delegate.getMaxChannelID());
}
catch (ClassNotFoundException e)
{
@@ -591,12 +699,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public boolean attemptReconnection()
{
- while (_failoverPolicy.failoverAllowed())
+ BrokerDetails broker = null;
+ while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
{
try
{
- makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
-
+ makeBrokerConnection(broker);
return true;
}
catch (Exception e)
@@ -628,6 +736,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _delegate.makeBrokerConnection(brokerDetail);
}
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ return _delegate.executeRetrySupport(operation);
+ }
+
/**
* Get the details of the currently active broker
*
@@ -653,7 +766,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
- return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+ return createSession(transacted, acknowledgeMode, _maxPrefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
@@ -871,7 +984,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (!_closed.getAndSet(true))
{
- doClose(sessions, timeout);
+ _closing.set(true);
+ try{
+ doClose(sessions, timeout);
+ }finally{
+ _closing.set(false);
+ }
}
}
@@ -917,7 +1035,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// adjust timeout
timeout = adjustTimeout(timeout, startCloseTime);
- _delegate.closeConneciton(timeout);
+ _delegate.closeConnection(timeout);
//If the taskpool hasn't shutdown by now then give it shutdownNow.
// This will interupt any running tasks.
@@ -932,6 +1050,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (AMQException e)
{
+ _logger.error("error:", e);
JMSException jmse = new JMSException("Error closing connection: " + e);
jmse.setLinkedException(e);
throw jmse;
@@ -1191,6 +1310,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _failoverMutex;
}
+ public void failoverPrep()
+ {
+ _delegate.failoverPrep();
+ }
+
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
_delegate.resubscribeSessions();
@@ -1245,6 +1369,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
else
{
//Should never get here as all AMQEs are required to have an ErrorCode!
+ // Other than AMQDisconnectedEx!
+
+ if (cause instanceof AMQDisconnectedException)
+ {
+ Exception last = _protocolHandler.getStateManager().getLastException();
+ if (last != null)
+ {
+ _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception");
+ cause = last;
+ }
+ }
je = new JMSException("Exception thrown against " + toString() + ": " + cause);
}
@@ -1259,8 +1394,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// in the case of an IOException, MINA has closed the protocol session so we set _closed to true
// so that any generic client code that tries to close the connection will not mess up this error
// handling sequence
- if (cause instanceof IOException)
+ if (cause instanceof IOException || cause instanceof AMQDisconnectedException)
{
+ // If we have an IOE/AMQDisconnect there is no connection to close on.
+ _closing.set(false);
closer = !_closed.getAndSet(true);
_protocolHandler.getProtocolSession().notifyError(je);
@@ -1317,7 +1454,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_sessions.put(channelId, session);
}
- void deregisterSession(int channelId)
+ public void deregisterSession(int channelId)
{
_sessions.remove(channelId);
}
@@ -1411,13 +1548,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ProtocolVersion getProtocolVersion()
{
- return _protocolVersion;
- }
-
- public void setProtocolVersion(ProtocolVersion protocolVersion)
- {
- _protocolVersion = protocolVersion;
- _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+ return _delegate.getProtocolVersion();
}
public boolean isFailingOver()
@@ -1444,4 +1575,27 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncPersistence;
}
+
+ /**
+ * Indicates whether we need to sync on every message ack
+ */
+ public boolean getSyncAck()
+ {
+ return _syncAck;
+ }
+
+ public String getSyncPublish()
+ {
+ return _syncPublish;
+ }
+
+ public void setIdleTimeout(long l)
+ {
+ _delegate.setIdleTimeout(l);
+ }
+
+ public int getNextChannelID()
+ {
+ return _sessions.getNextChannelId();
+ }
}