summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-07-08 22:45:09 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-07-08 22:45:09 +0000
commit157e216e233eebfcd334cf68cc0f63d5d78b122f (patch)
tree30d3a8ca68e2196fd5020cdc2aa661847ad70e1c /java/client/src
parentc9e2788f5b359a119b982ff32571a6b94b61c01c (diff)
downloadqpid-python-157e216e233eebfcd334cf68cc0f63d5d78b122f.tar.gz
QPID-3269
In order to verify the uniqueness of the client ID, a dummy session is created using client ID as it's name. This prevents any other connection from using same client ID as the session creation will fail. However this verification is switched off by default in order to preserve backwards compatibility. You need to use -Dqpid.verify_client_id=true switch verification on. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1144531 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java2
6 files changed, 84 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5b6a986997..fd21b376ac 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -173,8 +173,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates the sync publish options (persistent|all)
//By default it's async publish
private String _syncPublish = "";
-
- // Indicates whether to use the old map message format or the
+
+ // Indicates whether to use the old map message format or the
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
@@ -261,7 +261,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
throw new IllegalArgumentException("Connection must be specified");
}
-
+
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
@@ -311,7 +311,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
}
-
+
if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
{
_useLegacyMapMessageFormat = Boolean.parseBoolean(
@@ -322,16 +322,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the default value set for all connections
_useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
}
-
+
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
_logger.debug("AMQP version " + amqpVersion);
-
+
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if ("0-8".equals(amqpVersion))
+ if ("0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
- }
+ }
else if ("0-9".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_0_9(this);
@@ -418,6 +418,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
+ verifyClientID();
if (_logger.isDebugEnabled())
{
@@ -504,7 +505,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
- //Update our session to use this new protocol version
+ //Update our session to use this new protocol version
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
}
@@ -1074,7 +1075,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_username = id;
}
-
+
public String getPassword()
{
return _password;
@@ -1250,7 +1251,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
je.setLinkedException((Exception) cause);
}
-
+
je.initCause(cause);
}
@@ -1283,7 +1284,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Not a hard-error connection not closing: " + cause);
}
-
+
// deliver the exception if there is a listener
if (_exceptionListener != null)
{
@@ -1293,7 +1294,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.error("Throwable Received but no listener set: " + cause);
}
-
+
// if we are closing the connection, close sessions first
if (closer)
{
@@ -1351,17 +1352,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns connection url.
+ * Returns connection url.
* @return connection url
*/
public ConnectionURL getConnectionURL()
{
return _connectionURL;
}
-
+
/**
* Returns stringified connection url. This url is suitable only for display
- * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
+ * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
* @return connection url
*/
public String toURL()
@@ -1477,9 +1478,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _sessions.getNextChannelId();
}
-
+
public boolean isUseLegacyMapMessageFormat()
{
return _useLegacyMapMessageFormat;
}
+
+ private void verifyClientID() throws AMQException
+ {
+ if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
+ {
+ try
+ {
+ _delegate.verifyClientID();
+ }
+ catch(JMSException e)
+ {
+ throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique",e);
+ }
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 9560bd5c7c..5acdaaa185 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -57,10 +57,12 @@ public interface AMQConnectionDelegate
void closeConnection(long timeout) throws JMSException, AMQException;
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
-
+
int getMaxChannelID();
int getMinChannelID();
ProtocolVersion getProtocolVersion();
+
+ void verifyClientID() throws JMSException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index d50c9e16fe..e1643f095d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -47,6 +47,7 @@ import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ProtocolVersionException;
+import org.apache.qpid.transport.SessionDetachCode;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,7 +87,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
/**
* create a Session and start it if required.
*/
+
public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow)
+ throws JMSException
+ {
+ return createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null);
+ }
+
+ public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow, String name)
throws JMSException
{
_conn.checkNotClosed();
@@ -101,7 +109,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
try
{
session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
+ prefetchLow,name);
_conn.registerSession(channelId, session);
if (_conn._started)
{
@@ -449,12 +457,31 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
else
{
heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
- }
+ }
return heartbeat;
}
-
+
protected org.apache.qpid.transport.Connection getQpidConnection()
{
return _qpidConnection;
}
+
+ public void verifyClientID() throws JMSException
+ {
+ int prefetch = (int)_conn.getMaxPrefetch();
+ AMQSession_0_10 ssn = (AMQSession_0_10)createSession(false, 1,prefetch,prefetch,_conn.getClientID());
+ org.apache.qpid.transport.Session ssn_0_10 = ssn.getQpidSession();
+ try
+ {
+ ssn_0_10.awaitOpen();
+ }
+ catch(Exception e)
+ {
+ if (ssn_0_10.getDetachCode() != null &&
+ ssn_0_10.getDetachCode() == SessionDetachCode.SESSION_BUSY)
+ {
+ throw new JMSException("ClientID must be unique");
+ }
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index a837975304..22d1936463 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -332,4 +332,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return ProtocolVersion.v8_0;
}
+
+ public void verifyClientID() throws JMSException
+ {
+ // NOOP
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 1ea92c67f7..75d96d67af 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -159,13 +159,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
- int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
{
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
- _qpidSession = _qpidConnection.createSession(1);
+ if (name == null)
+ {
+ _qpidSession = _qpidConnection.createSession(1);
+ }
+ else
+ {
+ _qpidSession = _qpidConnection.createSession(name,1);
+ }
_qpidSession.setSessionListener(this);
if (_transacted)
{
@@ -192,11 +199,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param qpidConnection The connection
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+ boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow,
+ String name)
{
this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh, defaultPrefetchLow);
+ defaultPrefetchHigh, defaultPrefetchLow,name);
}
private void addUnacked(int id)
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index 354b67cd35..6b9121811d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
{
super(qpidConnection, con, channelId, false, // this is not a transacted session
Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
createSession();
_xaResource = new XAResourceImpl(this);
}