summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/acl.py')
-rwxr-xr-xqpid/cpp/src/tests/acl.py50
1 files changed, 50 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index 63b21059cf..9b170c16f5 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -47,6 +47,19 @@ class ACLTests(TestBase010):
connection.start()
return connection.session(str(uuid4()))
+ def port_i(self):
+ return int(self.defines["port-i"])
+
+ def port_u(self):
+ return int(self.defines["port-u"])
+
+ def get_session_by_port(self, user, passwd, byPort):
+ socket = connect(self.broker.host, byPort)
+ connection = Connection (sock=socket, username=user, password=passwd,
+ mechanism="PLAIN")
+ connection.start()
+ return connection.session(str(uuid4()))
+
def reload_acl(self):
result = None
try:
@@ -1495,6 +1508,43 @@ class ACLTests(TestBase010):
self.Lookup('mrQ', 'create', 'queue', '', {"maxqueuesize":"150", "maxqueuecount":"0" }, "deny")
+ #=====================================
+ # Connection limits
+ #=====================================
+
+ def test_connection_limits(self):
+ """
+ Test ACL control connection limits
+ """
+ # By username should be able to connect twice per user
+ try:
+ sessiona1 = self.get_session_by_port('anonymous','anonymous', self.port_u())
+ sessiona2 = self.get_session_by_port('anonymous','anonymous', self.port_u())
+ except Exception, e:
+ self.fail("Could not create two connections per user: " + str(e))
+
+ # Third session should fail
+ try:
+ sessiona3 = self.get_session_by_port('anonymous','anonymous', self.port_u())
+ self.fail("Should not be able to create third connection")
+ except Exception, e:
+ result = None
+
+ # By IP address should be able to connect twice per client address
+ try:
+ sessionb1 = self.get_session_by_port('anonymous','anonymous', self.port_i())
+ sessionb2 = self.get_session_by_port('anonymous','anonymous', self.port_i())
+ except Exception, e:
+ self.fail("Could not create two connections per user: " + str(e))
+
+ # Third session should fail
+ try:
+ sessionb3 = self.get_session_by_port('anonymous','anonymous', self.port_i())
+ self.fail("Should not be able to create third connection")
+ except Exception, e:
+ result = None
+
+
class BrokerAdmin:
def __init__(self, broker, username=None, password=None):
self.connection = qpid.messaging.Connection(broker)