summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/queue_redirect.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/queue_redirect.py')
-rw-r--r--qpid/cpp/src/tests/queue_redirect.py317
1 files changed, 317 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/queue_redirect.py b/qpid/cpp/src/tests/queue_redirect.py
new file mode 100644
index 0000000000..8a7b4c244b
--- /dev/null
+++ b/qpid/cpp/src/tests/queue_redirect.py
@@ -0,0 +1,317 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import qpid
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import uuid4
+from qpid.testlib import TestBase010
+from qmf.console import Session
+from qpid.datatypes import Message
+import qpid.messaging
+from time import sleep
+from os import environ, popen
+
+class ACLFile:
+ def __init__(self, policy='data_dir/policy.acl'):
+ self.f = open(policy,'w')
+
+ def write(self,line):
+ self.f.write(line)
+
+ def close(self):
+ self.f.close()
+
+class QueueredirectTests(TestBase010):
+
+ def get_session(self, user, passwd):
+ socket = connect(self.broker.host, self.broker.port)
+ connection = Connection (sock=socket, username=user, password=passwd,
+ mechanism="PLAIN")
+ connection.start()
+ return connection.session(str(uuid4()))
+
+ def reload_acl(self):
+ result = None
+ try:
+ self.broker_access.reloadAclFile()
+ except Exception, e:
+ result = str(e)
+ return result
+
+ def get_acl_file(self):
+ return ACLFile(self.config.defines.get("policy-file", "data_dir/policy.acl"))
+
+ def setUp(self):
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all all\n')
+ aclf.close()
+ TestBase010.setUp(self)
+ self.startBrokerAccess()
+ self.reload_acl()
+
+ def tearDown(self):
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all all\n')
+ aclf.close()
+ self.reload_acl()
+ TestBase010.tearDown(self)
+
+
+ def redirect(self, srcQueue, tgtQueue, expectPass, failMessage):
+ try:
+ result = {}
+ result = self.broker_access.Redirect(srcQueue, tgtQueue)
+ if not expectPass:
+ self.fail("src:" + srcQueue + ", tgt:" + tgtQueue + " - " + failMessage)
+ except Exception, e:
+ if expectPass:
+ self.fail("src:" + srcQueue + ", tgt:" + tgtQueue + " - " + failMessage)
+
+ def create_queue(self, session, name, autoDelete):
+ try:
+ session.queue_declare(queue=name, auto_delete=autoDelete)
+ except Exception, e:
+ self.fail("Should allow create queue " + name)
+
+ def _start_qpid_send(self, queue, count, content="X", capacity=100):
+ """ Use the qpid-send client to generate traffic to a queue.
+ """
+ command = "qpid-send" + \
+ " -b" + " %s:%s" % (self.broker.host, self.broker.port) \
+ + " -a " + str(queue) \
+ + " --messages " + str(count) \
+ + " --content-string " + str(content) \
+ + " --capacity " + str(capacity)
+ return popen(command)
+
+ def _start_qpid_receive(self, queue, count, timeout=5):
+ """ Use the qpid-receive client to consume from a queue.
+ Note well: prints one line of text to stdout for each consumed msg.
+ """
+ command = "qpid-receive" + \
+ " -b " + "%s:%s" % (self.broker.host, self.broker.port) \
+ + " -a " + str(queue) \
+ + " --messages " + str(count) \
+ + " --timeout " + str(timeout) \
+ + " --print-content yes"
+ return popen(command)
+
+
+
+ #=====================================
+ # QT queue tests
+ #=====================================
+
+ def test_010_deny_backing_up_a_nonexistant_queue(self):
+ session = self.get_session('bob','bob')
+ self.redirect("A010", "A010", False, "Should not allow redirect to non-existent queue A010")
+ session.close()
+
+ def test_020_deny_destroy_redirect(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A020", False)
+ self.redirect("A020", "", False, "Should not allow destroying redirect")
+ session.close()
+
+ def test_030_deny_redirecting_to_nonexistent_queue(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A030", False)
+ self.redirect("A030", "Axxx", False, "Should not allow redirect with non-existent queue Axxx")
+ session.close()
+
+ def test_040_deny_queue_redirecting_to_itself(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A040", False)
+ self.redirect("A040", "A040", False, "Should not allow redirect with itself")
+ session.close()
+
+ def test_050_deny_redirecting_autodelete_queue(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A050", True)
+ self.create_queue(session, "B050", False)
+ self.redirect("A050", "B050", False, "Should not allow redirect with autodelete source queue")
+ self.redirect("B050", "A050", False, "Should not allow redirect with autodelete target queue")
+ session.close()
+
+ def test_100_create_redirect_queue_pair(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A100", False)
+ self.create_queue(session, "B100", False)
+ self.redirect("A100", "B100", True, "Should allow redirect")
+ session.close()
+
+ def test_110_deny_adding_second_redirect_to_queue(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A110", False)
+ self.create_queue(session, "B110", False)
+ self.create_queue(session, "C110", False)
+ self.redirect("A110", "B110", True, "Should allow redirect")
+ self.redirect("A110", "C110", False, "Should deny second redirect")
+ self.redirect("C110", "B110", False, "Should deny second redirect")
+ session.close()
+
+ def test_120_verify_redirect_to_target(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A120", False)
+ self.create_queue(session, "B120", False)
+
+ # Send messages to original queue
+ sndr1 = self._start_qpid_send("A120", count=5, content="A120-before-rebind");
+ sndr1.close()
+
+ # redirect
+ self.redirect("A120", "B120", True, "Should allow redirect")
+
+ # Send messages to original queue
+ sndr2 = self._start_qpid_send("A120", count=3, content="A120-after-rebind");
+ sndr2.close()
+
+ # drain the queue
+ rcvr = self._start_qpid_receive("A120",
+ count=5)
+ count = 0;
+ x = rcvr.readline() # prints a line for each received msg
+ while x:
+# print "Read from A120 " + x
+ count += 1;
+ x = rcvr.readline()
+
+ self.assertEqual(count, 5)
+
+ # drain the queue
+ rcvrB = self._start_qpid_receive("B120",
+ count=3)
+ count = 0;
+ x = rcvrB.readline() # prints a line for each received msg
+ while x:
+# print "Read from B120 " + x
+ count += 1;
+ x = rcvrB.readline()
+
+ self.assertEqual(count, 3)
+
+ ###session.close()
+
+ def test_140_verify_redirect_to_source(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A140", False)
+ self.create_queue(session, "B140", False)
+
+ # Send messages to target queue - these go onto B
+ sndr1 = self._start_qpid_send("B140", count=5, content="B140-before-rebind");
+ sndr1.close()
+
+ # redirect
+ self.redirect("A140", "B140", True, "Should allow redirect")
+
+ # Send messages to target queue - these go onto A
+ sndr2 = self._start_qpid_send("B140", count=3, content="B140-after-rebind");
+ sndr2.close()
+
+ # drain the queue
+ rcvr = self._start_qpid_receive("B140", count=5)
+ count = 0;
+ x = rcvr.readline() # prints a line for each received msg
+ while x:
+ # print "Read from B140 " + x
+ count += 1;
+ x = rcvr.readline()
+
+ self.assertEqual(count, 5)
+
+ # drain the queue
+ rcvrB = self._start_qpid_receive("A140", count=3)
+ count = 0;
+ x = rcvrB.readline() # prints a line for each received msg
+ while x:
+ # print "Read from A140 " + x
+ count += 1;
+ x = rcvrB.readline()
+
+ self.assertEqual(count, 3)
+
+ ###session.close()
+
+ def test_150_queue_deletion_destroys_redirect(self):
+ session = self.get_session('bob','bob')
+ self.create_queue(session, "A150", False)
+ self.create_queue(session, "B150", False)
+ self.create_queue(session, "C150", False)
+
+ # redirect
+ self.redirect("A150", "B150", True, "Should allow redirect")
+
+ self.redirect("A150", "C150", False, "A is already redirected")
+
+ alice = BrokerAdmin(self.config.broker, "bob", "bob")
+ alice.delete_queue("B150") #should pass
+
+ self.redirect("A150", "C150", True, "Should allow redirect")
+ session.close()
+
+##############################################################################################
+class BrokerAdmin:
+ def __init__(self, broker, username=None, password=None):
+ self.connection = qpid.messaging.Connection(broker)
+ if username:
+ self.connection.username = username
+ self.connection.password = password
+ self.connection.sasl_mechanisms = "PLAIN"
+ self.connection.open()
+ self.session = self.connection.session()
+ self.sender = self.session.sender("qmf.default.direct/broker")
+ self.reply_to = "responses-#; {create:always}"
+ self.receiver = self.session.receiver(self.reply_to)
+
+ def invoke(self, method, arguments):
+ content = {
+ "_object_id": {"_object_name": "org.apache.qpid.broker:broker:amqp-broker"},
+ "_method_name": method,
+ "_arguments": arguments
+ }
+ request = qpid.messaging.Message(reply_to=self.reply_to, content=content)
+ request.properties["x-amqp-0-10.app-id"] = "qmf2"
+ request.properties["qmf.opcode"] = "_method_request"
+ self.sender.send(request)
+ response = self.receiver.fetch()
+ self.session.acknowledge()
+ if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
+ if response.properties['qmf.opcode'] == '_method_response':
+ return response.content['_arguments']
+ elif response.properties['qmf.opcode'] == '_exception':
+ raise Exception(response.content['_values'])
+ else: raise Exception("Invalid response received, unexpected opcode: %s" % response.properties['qmf.opcode'])
+ else: raise Exception("Invalid response received, not a qmfv2 method: %s" % response.properties['x-amqp-0-10.app-id'])
+
+ def create_exchange(self, name, exchange_type=None, options={}):
+ properties = options
+ if exchange_type: properties["exchange_type"] = exchange_type
+ self.invoke("create", {"type": "exchange", "name":name, "properties":properties})
+
+ def create_queue(self, name, properties={}):
+ self.invoke("create", {"type": "queue", "name":name, "properties":properties})
+
+ def delete_exchange(self, name):
+ self.invoke("delete", {"type": "exchange", "name":name})
+
+ def delete_queue(self, name):
+ self.invoke("delete", {"type": "queue", "name":name})