diff options
author | Robert Gemmell <robbie@apache.org> | 2011-09-13 00:26:40 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-09-13 00:26:40 +0000 |
commit | 79f25ae18103afc16bd92abf8ed31df1992f13cf (patch) | |
tree | b9421cd313d59d33c0092926ee98fb48dc0c5e40 | |
parent | 6c83b9c3c46f31bebcdefc0359b8125391ef6c6f (diff) | |
download | qpid-python-79f25ae18103afc16bd92abf8ed31df1992f13cf.tar.gz |
QPID-3428: make the Java broker validate 0-10 Session names, enabling it to satisfy the clients new ClientID verification feature. Misc updates to the clients verification process.
Applied patch from Andrew MacBean
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1169982 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 144 insertions, 23 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 061ebf50cd..b51e6aff1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -69,4 +69,8 @@ public interface AMQConnectionModel extends StatisticsGatherer * Return a {@link LogSubject} for the connection. */ public LogSubject getLogSubject(); + + public String getUserName(); + + public boolean isSessionNameUnique(String name); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 5332031362..bff0a79de1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1394,4 +1394,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { _statisticsEnabled = enabled; } + + @Override + public boolean isSessionNameUnique(String name) + { + return true; + } + + @Override + public String getUserName() + { + return getAuthorizedPrincipal().getName(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index eaa11d7acb..d83013afba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -20,14 +20,16 @@ */ package org.apache.qpid.server.transport; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; import java.security.Principal; import java.text.MessageFormat; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; @@ -385,4 +387,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { return _connectionId; } + + @Override + public boolean isSessionNameUnique(String name) + { + return !super.hasSessionWithName(name); + } + + @Override + public String getUserName() + { + return _authorizedPrincipal.getName(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index b3acf48676..2de8a0425e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.StringTokenizer; @@ -32,6 +33,7 @@ import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; @@ -39,7 +41,20 @@ import org.apache.qpid.server.security.auth.AuthenticationResult; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.*; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionClose; +import org.apache.qpid.transport.ConnectionCloseCode; +import org.apache.qpid.transport.ConnectionOpen; +import org.apache.qpid.transport.ConnectionOpenOk; +import org.apache.qpid.transport.ConnectionTuneOk; +import org.apache.qpid.transport.ServerDelegate; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionAttach; +import org.apache.qpid.transport.SessionDelegate; +import org.apache.qpid.transport.SessionDetach; +import org.apache.qpid.transport.SessionDetachCode; +import org.apache.qpid.transport.SessionDetached; public class ServerConnectionDelegate extends ServerDelegate { @@ -215,4 +230,40 @@ public class ServerConnectionDelegate extends ServerDelegate ssn.unregister(subscription_0_10); } } + + @Override + public void sessionAttach(final Connection conn, final SessionAttach atc) + { + final String clientId = new String(atc.getName()); + final Session ssn = getSession(conn, atc); + + if(isSessionNameUnique(clientId,conn)) + { + conn.registerSession(ssn); + super.sessionAttach(conn, atc); + } + else + { + ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY)); + ssn.closed(); + } + } + + private boolean isSessionNameUnique(final String name, final Connection conn) + { + final ServerConnection sconn = (ServerConnection) conn; + final String userId = sconn.getUserName(); + + final Iterator<AMQConnectionModel> connections = + ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator(); + while(connections.hasNext()) + { + final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next(); + if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name)) + { + return false; + } + } + return true; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d9194a3408..f15af72407 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1451,11 +1451,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - _delegate.verifyClientID(); + if (!_delegate.verifyClientID()) + { + throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique"); + } } catch(JMSException e) { - throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique",e); + throw new AMQException(e.getMessage(),e); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 5acdaaa185..8768f93c8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -64,5 +64,5 @@ public interface AMQConnectionDelegate ProtocolVersion getProtocolVersion(); - void verifyClientID() throws JMSException; + boolean verifyClientID() throws JMSException, AMQException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 6dfbb969e7..63342bdb26 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -49,6 +49,7 @@ import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.ProtocolVersionException; import org.apache.qpid.transport.SessionDetachCode; +import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -408,7 +409,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return _qpidConnection; } - public void verifyClientID() throws JMSException + public boolean verifyClientID() throws JMSException, AMQException { int prefetch = (int)_conn.getMaxPrefetch(); AMQSession_0_10 ssn = (AMQSession_0_10)createSession(false, 1,prefetch,prefetch,_conn.getClientID()); @@ -417,13 +418,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { ssn_0_10.awaitOpen(); } - catch(Exception e) + catch(SessionException se) { + //if due to non unique client id for user return false, otherwise wrap and re-throw. if (ssn_0_10.getDetachCode() != null && ssn_0_10.getDetachCode() == SessionDetachCode.SESSION_BUSY) { - throw new JMSException("ClientID must be unique"); + return false; + } + else + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, "Unexpected SessionException thrown while awaiting session opening", se); } } + return true; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 948f5178a6..92f9ebe07c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -351,8 +351,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return ProtocolVersion.v8_0; } - public void verifyClientID() throws JMSException + public boolean verifyClientID() throws JMSException { - // NOOP + return true; } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index d6ddbaa061..469b007ab3 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -347,14 +347,22 @@ public class Connection extends ConnectionInvoker } Session ssn = _sessionFactory.newSession(this, name, expiry); - sessions.put(name, ssn); + registerSession(ssn); map(ssn); ssn.attach(); return ssn; } } - void removeSession(Session ssn) + public void registerSession(Session ssn) + { + synchronized (lock) + { + sessions.put(ssn.getName(),ssn); + } + } + + public void removeSession(Session ssn) { synchronized (lock) { @@ -707,4 +715,9 @@ public class Connection extends ConnectionInvoker { return channels.values(); } + + public boolean hasSessionWithName(final String name) + { + return sessions.containsKey(new Binary(name.getBytes())); + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index f183c1e241..393301659d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -85,7 +85,7 @@ public abstract class ConnectionDelegate @Override public void sessionDetach(Connection conn, SessionDetach dtc) { Session ssn = conn.getSession(dtc.getChannel()); - ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL); + ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode()); conn.unmap(ssn); ssn.closed(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index a838257fb6..556134f984 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -1099,6 +1099,7 @@ public class Session extends SessionInvoker { throw new SessionException("Timed out waiting for Session to open"); } + break; case DETACHED: case CLOSING: case CLOSED: diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index fe2ea6ef10..328719813a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -292,9 +292,10 @@ public class ConnectionTest extends QpidBrokerTestCase } } - public void testClientIDVerification() throws Exception + public void testClientIDVerificationForSameUser() throws Exception { - System.setProperty("qpid.verify_client_id", "true"); + setTestSystemProperty("qpid.verify_client_id", "true"); + BrokerDetails broker = getBroker(); try { @@ -302,19 +303,34 @@ public class ConnectionTest extends QpidBrokerTestCase "client_id", "test"); Connection con2 = new AMQConnection(broker.toString(), "guest", "guest", - "client_id", "test"); + "client_id", "test"); fail("The client should throw a ConnectionException stating the" + " client ID is not unique"); } catch (Exception e) { - assertTrue("Incorrect exception thrown", + assertTrue("Incorrect exception thrown: " + e.getMessage(), e.getMessage().contains("ClientID must be unique")); } - finally + } + + public void testClientIDVerificationForDifferentUsers() throws Exception + { + setTestSystemProperty("qpid.verify_client_id", "true"); + + BrokerDetails broker = getBroker(); + try + { + Connection con = new AMQConnection(broker.toString(), "guest", "guest", + "client_id", "test"); + + Connection con2 = new AMQConnection(broker.toString(), "admin", "admin", + "client_id", "test"); + } + catch (Exception e) { - System.setProperty("qpid.verify_client_id", "false"); + fail("Unexpected exception thrown, client id was not unique but usernames were different! " + e.getMessage()); } } diff --git a/java/test-profiles/JavaExcludes b/java/test-profiles/JavaExcludes index dcccce7e2f..8de36cbd9a 100644 --- a/java/test-profiles/JavaExcludes +++ b/java/test-profiles/JavaExcludes @@ -38,9 +38,6 @@ org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* //QPID-1818, QPID-1821 : Client code path does not correctly restore a transacted session after failover. org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* -//QPID-3428: verification of unique client id does not work -org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerification - //XA functionality is not fully implemented yet org.apache.qpid.jms.xa.XAResourceTest#* diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes index 07c3f15a8f..3efed3220f 100644 --- a/java/test-profiles/JavaPre010Excludes +++ b/java/test-profiles/JavaPre010Excludes @@ -56,3 +56,6 @@ org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testOderingWith // LVQ tests use new address syntax and can not be run on 0.9.1 profiles org.apache.qpid.test.client.queue.LVQTest#* + +// Verification of unique client id is 0-10 specific +org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerification |