summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java130
1 files changed, 55 insertions, 75 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5b6a986997..f15af72407 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -147,9 +147,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private QpidConnectionMetaData _connectionMetaData;
- /** Configuration info for SSL */
- private SSLConfiguration _sslConfiguration;
-
private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
@@ -173,8 +170,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates the sync publish options (persistent|all)
//By default it's async publish
private String _syncPublish = "";
-
- // Indicates whether to use the old map message format or the
+
+ // Indicates whether to use the old map message format or the
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
@@ -194,74 +191,33 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), null);
- }
-
- /**
- * @param broker brokerdetails
- * @param username username
- * @param password password
- * @param clientName clientid
- * @param virtualHost virtualhost
- *
- * @throws AMQException
- * @throws URLSyntaxException
- */
- public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
- SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(
- ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
+ + AMQBrokerDetails.checkTransport(broker) + "'"));
}
public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
throws AMQException, URLSyntaxException
{
- this(host, port, false, username, password, clientName, virtualHost, null);
- }
-
- public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
- SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
- {
- this(host, port, false, username, password, clientName, virtualHost, sslConfig);
- }
-
- public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
- String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
- {
this(new AMQConnectionURL(
- useSSL
- ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'")
- : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
- + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig);
+ ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"));
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(connection), null);
- }
-
- public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(connection), sslConfig);
+ this(new AMQConnectionURL(connection));
}
/**
* @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
* was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
*/
- public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
+ public AMQConnection(ConnectionURL connectionURL) throws AMQException
{
if (connectionURL == null)
{
throw new IllegalArgumentException("Connection must be specified");
}
-
+
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
@@ -311,7 +267,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
}
-
+
if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
{
_useLegacyMapMessageFormat = Boolean.parseBoolean(
@@ -322,16 +278,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the default value set for all connections
_useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
}
-
+
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
_logger.debug("AMQP version " + amqpVersion);
-
+
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if ("0-8".equals(amqpVersion))
+ if ("0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
- }
+ }
else if ("0-9".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_0_9(this);
@@ -350,7 +306,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.info("Connection:" + connectionURL);
}
- _sslConfiguration = sslConfig;
_connectionURL = connectionURL;
_clientName = connectionURL.getClientName();
@@ -418,6 +373,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
+ verifyClientID();
if (_logger.isDebugEnabled())
{
@@ -504,7 +460,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
- //Update our session to use this new protocol version
+ //Update our session to use this new protocol version
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
}
@@ -547,7 +503,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public boolean attemptReconnection(String host, int port)
{
- BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
+ BrokerDetails bd = new AMQBrokerDetails(host, port);
_failoverPolicy.setBroker(bd);
@@ -1074,7 +1030,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_username = id;
}
-
+
public String getPassword()
{
return _password;
@@ -1227,7 +1183,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (code != null)
{
- je = new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + cause);
+ je = new JMSException("Exception thrown against " + toString() + ": " + cause, Integer.toString(code.getCode()));
}
else
{
@@ -1250,7 +1206,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
je.setLinkedException((Exception) cause);
}
-
+
je.initCause(cause);
}
@@ -1283,7 +1239,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Not a hard-error connection not closing: " + cause);
}
-
+
// deliver the exception if there is a listener
if (_exceptionListener != null)
{
@@ -1293,7 +1249,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.error("Throwable Received but no listener set: " + cause);
}
-
+
// if we are closing the connection, close sessions first
if (closer)
{
@@ -1351,17 +1307,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns connection url.
+ * Returns connection url.
* @return connection url
*/
public ConnectionURL getConnectionURL()
{
return _connectionURL;
}
-
+
/**
* Returns stringified connection url. This url is suitable only for display
- * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
+ * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
* @return connection url
*/
public String toURL()
@@ -1375,11 +1331,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQConnectionFactory.class.getName(), null); // factory location
}
- public SSLConfiguration getSSLConfiguration()
- {
- return _sslConfiguration;
- }
-
public AMQShortString getDefaultTopicExchangeName()
{
return _defaultTopicExchangeName;
@@ -1434,7 +1385,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _delegate.getProtocolVersion();
}
-
+
+ public String getBrokerUUID()
+ {
+ if(getProtocolVersion().equals(ProtocolVersion.v0_10))
+ {
+ return ((AMQConnectionDelegate_0_10)_delegate).getUUID();
+ }
+ else
+ {
+ return null;
+ }
+ }
public boolean isFailingOver()
{
return (_protocolHandler.getFailoverLatch() != null);
@@ -1477,9 +1439,27 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _sessions.getNextChannelId();
}
-
+
public boolean isUseLegacyMapMessageFormat()
{
return _useLegacyMapMessageFormat;
}
+
+ private void verifyClientID() throws AMQException
+ {
+ if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
+ {
+ try
+ {
+ if (!_delegate.verifyClientID())
+ {
+ throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique");
+ }
+ }
+ catch(JMSException e)
+ {
+ throw new AMQException(e.getMessage(),e);
+ }
+ }
+ }
}