summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-09-13 00:26:40 +0000
committerRobert Gemmell <robbie@apache.org>2011-09-13 00:26:40 +0000
commit79f25ae18103afc16bd92abf8ed31df1992f13cf (patch)
treeb9421cd313d59d33c0092926ee98fb48dc0c5e40
parent6c83b9c3c46f31bebcdefc0359b8125391ef6c6f (diff)
downloadqpid-python-79f25ae18103afc16bd92abf8ed31df1992f13cf.tar.gz
QPID-3428: make the Java broker validate 0-10 Session names, enabling it to satisfy the clients new ClientID verification feature. Misc updates to the clients verification process.
Applied patch from Andrew MacBean git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1169982 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java28
-rw-r--r--java/test-profiles/JavaExcludes3
-rw-r--r--java/test-profiles/JavaPre010Excludes3
14 files changed, 144 insertions, 23 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 061ebf50cd..b51e6aff1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -69,4 +69,8 @@ public interface AMQConnectionModel extends StatisticsGatherer
* Return a {@link LogSubject} for the connection.
*/
public LogSubject getLogSubject();
+
+ public String getUserName();
+
+ public boolean isSessionNameUnique(String name);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 5332031362..bff0a79de1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -1394,4 +1394,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
_statisticsEnabled = enabled;
}
+
+ @Override
+ public boolean isSessionNameUnique(String name)
+ {
+ return true;
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return getAuthorizedPrincipal().getName();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index eaa11d7acb..d83013afba 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -20,14 +20,16 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
import java.security.Principal;
import java.text.MessageFormat;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
@@ -385,4 +387,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
return _connectionId;
}
+
+ @Override
+ public boolean isSessionNameUnique(String name)
+ {
+ return !super.hasSessionWithName(name);
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return _authorizedPrincipal.getName();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index b3acf48676..2de8a0425e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
@@ -32,6 +33,7 @@ import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
@@ -39,7 +41,20 @@ import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionOpen;
+import org.apache.qpid.transport.ConnectionOpenOk;
+import org.apache.qpid.transport.ConnectionTuneOk;
+import org.apache.qpid.transport.ServerDelegate;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionAttach;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionDetached;
public class ServerConnectionDelegate extends ServerDelegate
{
@@ -215,4 +230,40 @@ public class ServerConnectionDelegate extends ServerDelegate
ssn.unregister(subscription_0_10);
}
}
+
+ @Override
+ public void sessionAttach(final Connection conn, final SessionAttach atc)
+ {
+ final String clientId = new String(atc.getName());
+ final Session ssn = getSession(conn, atc);
+
+ if(isSessionNameUnique(clientId,conn))
+ {
+ conn.registerSession(ssn);
+ super.sessionAttach(conn, atc);
+ }
+ else
+ {
+ ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
+ ssn.closed();
+ }
+ }
+
+ private boolean isSessionNameUnique(final String name, final Connection conn)
+ {
+ final ServerConnection sconn = (ServerConnection) conn;
+ final String userId = sconn.getUserName();
+
+ final Iterator<AMQConnectionModel> connections =
+ ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
+ while(connections.hasNext())
+ {
+ final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
+ if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index d9194a3408..f15af72407 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1451,11 +1451,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- _delegate.verifyClientID();
+ if (!_delegate.verifyClientID())
+ {
+ throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique");
+ }
}
catch(JMSException e)
{
- throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique",e);
+ throw new AMQException(e.getMessage(),e);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 5acdaaa185..8768f93c8c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -64,5 +64,5 @@ public interface AMQConnectionDelegate
ProtocolVersion getProtocolVersion();
- void verifyClientID() throws JMSException;
+ boolean verifyClientID() throws JMSException, AMQException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 6dfbb969e7..63342bdb26 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -49,6 +49,7 @@ import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ProtocolVersionException;
import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -408,7 +409,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return _qpidConnection;
}
- public void verifyClientID() throws JMSException
+ public boolean verifyClientID() throws JMSException, AMQException
{
int prefetch = (int)_conn.getMaxPrefetch();
AMQSession_0_10 ssn = (AMQSession_0_10)createSession(false, 1,prefetch,prefetch,_conn.getClientID());
@@ -417,13 +418,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
ssn_0_10.awaitOpen();
}
- catch(Exception e)
+ catch(SessionException se)
{
+ //if due to non unique client id for user return false, otherwise wrap and re-throw.
if (ssn_0_10.getDetachCode() != null &&
ssn_0_10.getDetachCode() == SessionDetachCode.SESSION_BUSY)
{
- throw new JMSException("ClientID must be unique");
+ return false;
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "Unexpected SessionException thrown while awaiting session opening", se);
}
}
+ return true;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 948f5178a6..92f9ebe07c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -351,8 +351,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
return ProtocolVersion.v8_0;
}
- public void verifyClientID() throws JMSException
+ public boolean verifyClientID() throws JMSException
{
- // NOOP
+ return true;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index d6ddbaa061..469b007ab3 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -347,14 +347,22 @@ public class Connection extends ConnectionInvoker
}
Session ssn = _sessionFactory.newSession(this, name, expiry);
- sessions.put(name, ssn);
+ registerSession(ssn);
map(ssn);
ssn.attach();
return ssn;
}
}
- void removeSession(Session ssn)
+ public void registerSession(Session ssn)
+ {
+ synchronized (lock)
+ {
+ sessions.put(ssn.getName(),ssn);
+ }
+ }
+
+ public void removeSession(Session ssn)
{
synchronized (lock)
{
@@ -707,4 +715,9 @@ public class Connection extends ConnectionInvoker
{
return channels.values();
}
+
+ public boolean hasSessionWithName(final String name)
+ {
+ return sessions.containsKey(new Binary(name.getBytes()));
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index f183c1e241..393301659d 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -85,7 +85,7 @@ public abstract class ConnectionDelegate
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
Session ssn = conn.getSession(dtc.getChannel());
- ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
+ ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode());
conn.unmap(ssn);
ssn.closed();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index a838257fb6..556134f984 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -1099,6 +1099,7 @@ public class Session extends SessionInvoker
{
throw new SessionException("Timed out waiting for Session to open");
}
+ break;
case DETACHED:
case CLOSING:
case CLOSED:
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index fe2ea6ef10..328719813a 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -292,9 +292,10 @@ public class ConnectionTest extends QpidBrokerTestCase
}
}
- public void testClientIDVerification() throws Exception
+ public void testClientIDVerificationForSameUser() throws Exception
{
- System.setProperty("qpid.verify_client_id", "true");
+ setTestSystemProperty("qpid.verify_client_id", "true");
+
BrokerDetails broker = getBroker();
try
{
@@ -302,19 +303,34 @@ public class ConnectionTest extends QpidBrokerTestCase
"client_id", "test");
Connection con2 = new AMQConnection(broker.toString(), "guest", "guest",
- "client_id", "test");
+ "client_id", "test");
fail("The client should throw a ConnectionException stating the" +
" client ID is not unique");
}
catch (Exception e)
{
- assertTrue("Incorrect exception thrown",
+ assertTrue("Incorrect exception thrown: " + e.getMessage(),
e.getMessage().contains("ClientID must be unique"));
}
- finally
+ }
+
+ public void testClientIDVerificationForDifferentUsers() throws Exception
+ {
+ setTestSystemProperty("qpid.verify_client_id", "true");
+
+ BrokerDetails broker = getBroker();
+ try
+ {
+ Connection con = new AMQConnection(broker.toString(), "guest", "guest",
+ "client_id", "test");
+
+ Connection con2 = new AMQConnection(broker.toString(), "admin", "admin",
+ "client_id", "test");
+ }
+ catch (Exception e)
{
- System.setProperty("qpid.verify_client_id", "false");
+ fail("Unexpected exception thrown, client id was not unique but usernames were different! " + e.getMessage());
}
}
diff --git a/java/test-profiles/JavaExcludes b/java/test-profiles/JavaExcludes
index dcccce7e2f..8de36cbd9a 100644
--- a/java/test-profiles/JavaExcludes
+++ b/java/test-profiles/JavaExcludes
@@ -38,9 +38,6 @@ org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#*
//QPID-1818, QPID-1821 : Client code path does not correctly restore a transacted session after failover.
org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
-//QPID-3428: verification of unique client id does not work
-org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerification
-
//XA functionality is not fully implemented yet
org.apache.qpid.jms.xa.XAResourceTest#*
diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes
index 07c3f15a8f..3efed3220f 100644
--- a/java/test-profiles/JavaPre010Excludes
+++ b/java/test-profiles/JavaPre010Excludes
@@ -56,3 +56,6 @@ org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testOderingWith
// LVQ tests use new address syntax and can not be run on 0.9.1 profiles
org.apache.qpid.test.client.queue.LVQTest#*
+
+// Verification of unique client id is 0-10 specific
+org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerification