diff options
Diffstat (limited to 'qpid/cpp/src/tests/acl.py')
-rwxr-xr-x | qpid/cpp/src/tests/acl.py | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py index 63b21059cf..9b170c16f5 100755 --- a/qpid/cpp/src/tests/acl.py +++ b/qpid/cpp/src/tests/acl.py @@ -47,6 +47,19 @@ class ACLTests(TestBase010): connection.start() return connection.session(str(uuid4())) + def port_i(self): + return int(self.defines["port-i"]) + + def port_u(self): + return int(self.defines["port-u"]) + + def get_session_by_port(self, user, passwd, byPort): + socket = connect(self.broker.host, byPort) + connection = Connection (sock=socket, username=user, password=passwd, + mechanism="PLAIN") + connection.start() + return connection.session(str(uuid4())) + def reload_acl(self): result = None try: @@ -1495,6 +1508,43 @@ class ACLTests(TestBase010): self.Lookup('mrQ', 'create', 'queue', '', {"maxqueuesize":"150", "maxqueuecount":"0" }, "deny") + #===================================== + # Connection limits + #===================================== + + def test_connection_limits(self): + """ + Test ACL control connection limits + """ + # By username should be able to connect twice per user + try: + sessiona1 = self.get_session_by_port('anonymous','anonymous', self.port_u()) + sessiona2 = self.get_session_by_port('anonymous','anonymous', self.port_u()) + except Exception, e: + self.fail("Could not create two connections per user: " + str(e)) + + # Third session should fail + try: + sessiona3 = self.get_session_by_port('anonymous','anonymous', self.port_u()) + self.fail("Should not be able to create third connection") + except Exception, e: + result = None + + # By IP address should be able to connect twice per client address + try: + sessionb1 = self.get_session_by_port('anonymous','anonymous', self.port_i()) + sessionb2 = self.get_session_by_port('anonymous','anonymous', self.port_i()) + except Exception, e: + self.fail("Could not create two connections per user: " + str(e)) + + # Third session should fail + try: + sessionb3 = self.get_session_by_port('anonymous','anonymous', self.port_i()) + self.fail("Should not be able to create third connection") + except Exception, e: + result = None + + class BrokerAdmin: def __init__(self, broker, username=None, password=None): self.connection = qpid.messaging.Connection(broker) |