diff options
Diffstat (limited to 'qpid/cpp/src/tests/queue_redirect.py')
-rw-r--r-- | qpid/cpp/src/tests/queue_redirect.py | 317 |
1 files changed, 317 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/queue_redirect.py b/qpid/cpp/src/tests/queue_redirect.py new file mode 100644 index 0000000000..8a7b4c244b --- /dev/null +++ b/qpid/cpp/src/tests/queue_redirect.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import qpid +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import uuid4 +from qpid.testlib import TestBase010 +from qmf.console import Session +from qpid.datatypes import Message +import qpid.messaging +from time import sleep +from os import environ, popen + +class ACLFile: + def __init__(self, policy='data_dir/policy.acl'): + self.f = open(policy,'w') + + def write(self,line): + self.f.write(line) + + def close(self): + self.f.close() + +class QueueredirectTests(TestBase010): + + def get_session(self, user, passwd): + socket = connect(self.broker.host, self.broker.port) + connection = Connection (sock=socket, username=user, password=passwd, + mechanism="PLAIN") + connection.start() + return connection.session(str(uuid4())) + + def reload_acl(self): + result = None + try: + self.broker_access.reloadAclFile() + except Exception, e: + result = str(e) + return result + + def get_acl_file(self): + return ACLFile(self.config.defines.get("policy-file", "data_dir/policy.acl")) + + def setUp(self): + aclf = self.get_acl_file() + aclf.write('acl allow all all\n') + aclf.close() + TestBase010.setUp(self) + self.startBrokerAccess() + self.reload_acl() + + def tearDown(self): + aclf = self.get_acl_file() + aclf.write('acl allow all all\n') + aclf.close() + self.reload_acl() + TestBase010.tearDown(self) + + + def redirect(self, srcQueue, tgtQueue, expectPass, failMessage): + try: + result = {} + result = self.broker_access.Redirect(srcQueue, tgtQueue) + if not expectPass: + self.fail("src:" + srcQueue + ", tgt:" + tgtQueue + " - " + failMessage) + except Exception, e: + if expectPass: + self.fail("src:" + srcQueue + ", tgt:" + tgtQueue + " - " + failMessage) + + def create_queue(self, session, name, autoDelete): + try: + session.queue_declare(queue=name, auto_delete=autoDelete) + except Exception, e: + self.fail("Should allow create queue " + name) + + def _start_qpid_send(self, queue, count, content="X", capacity=100): + """ Use the qpid-send client to generate traffic to a queue. + """ + command = "qpid-send" + \ + " -b" + " %s:%s" % (self.broker.host, self.broker.port) \ + + " -a " + str(queue) \ + + " --messages " + str(count) \ + + " --content-string " + str(content) \ + + " --capacity " + str(capacity) + return popen(command) + + def _start_qpid_receive(self, queue, count, timeout=5): + """ Use the qpid-receive client to consume from a queue. + Note well: prints one line of text to stdout for each consumed msg. + """ + command = "qpid-receive" + \ + " -b " + "%s:%s" % (self.broker.host, self.broker.port) \ + + " -a " + str(queue) \ + + " --messages " + str(count) \ + + " --timeout " + str(timeout) \ + + " --print-content yes" + return popen(command) + + + + #===================================== + # QT queue tests + #===================================== + + def test_010_deny_backing_up_a_nonexistant_queue(self): + session = self.get_session('bob','bob') + self.redirect("A010", "A010", False, "Should not allow redirect to non-existent queue A010") + session.close() + + def test_020_deny_destroy_redirect(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A020", False) + self.redirect("A020", "", False, "Should not allow destroying redirect") + session.close() + + def test_030_deny_redirecting_to_nonexistent_queue(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A030", False) + self.redirect("A030", "Axxx", False, "Should not allow redirect with non-existent queue Axxx") + session.close() + + def test_040_deny_queue_redirecting_to_itself(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A040", False) + self.redirect("A040", "A040", False, "Should not allow redirect with itself") + session.close() + + def test_050_deny_redirecting_autodelete_queue(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A050", True) + self.create_queue(session, "B050", False) + self.redirect("A050", "B050", False, "Should not allow redirect with autodelete source queue") + self.redirect("B050", "A050", False, "Should not allow redirect with autodelete target queue") + session.close() + + def test_100_create_redirect_queue_pair(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A100", False) + self.create_queue(session, "B100", False) + self.redirect("A100", "B100", True, "Should allow redirect") + session.close() + + def test_110_deny_adding_second_redirect_to_queue(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A110", False) + self.create_queue(session, "B110", False) + self.create_queue(session, "C110", False) + self.redirect("A110", "B110", True, "Should allow redirect") + self.redirect("A110", "C110", False, "Should deny second redirect") + self.redirect("C110", "B110", False, "Should deny second redirect") + session.close() + + def test_120_verify_redirect_to_target(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A120", False) + self.create_queue(session, "B120", False) + + # Send messages to original queue + sndr1 = self._start_qpid_send("A120", count=5, content="A120-before-rebind"); + sndr1.close() + + # redirect + self.redirect("A120", "B120", True, "Should allow redirect") + + # Send messages to original queue + sndr2 = self._start_qpid_send("A120", count=3, content="A120-after-rebind"); + sndr2.close() + + # drain the queue + rcvr = self._start_qpid_receive("A120", + count=5) + count = 0; + x = rcvr.readline() # prints a line for each received msg + while x: +# print "Read from A120 " + x + count += 1; + x = rcvr.readline() + + self.assertEqual(count, 5) + + # drain the queue + rcvrB = self._start_qpid_receive("B120", + count=3) + count = 0; + x = rcvrB.readline() # prints a line for each received msg + while x: +# print "Read from B120 " + x + count += 1; + x = rcvrB.readline() + + self.assertEqual(count, 3) + + ###session.close() + + def test_140_verify_redirect_to_source(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A140", False) + self.create_queue(session, "B140", False) + + # Send messages to target queue - these go onto B + sndr1 = self._start_qpid_send("B140", count=5, content="B140-before-rebind"); + sndr1.close() + + # redirect + self.redirect("A140", "B140", True, "Should allow redirect") + + # Send messages to target queue - these go onto A + sndr2 = self._start_qpid_send("B140", count=3, content="B140-after-rebind"); + sndr2.close() + + # drain the queue + rcvr = self._start_qpid_receive("B140", count=5) + count = 0; + x = rcvr.readline() # prints a line for each received msg + while x: + # print "Read from B140 " + x + count += 1; + x = rcvr.readline() + + self.assertEqual(count, 5) + + # drain the queue + rcvrB = self._start_qpid_receive("A140", count=3) + count = 0; + x = rcvrB.readline() # prints a line for each received msg + while x: + # print "Read from A140 " + x + count += 1; + x = rcvrB.readline() + + self.assertEqual(count, 3) + + ###session.close() + + def test_150_queue_deletion_destroys_redirect(self): + session = self.get_session('bob','bob') + self.create_queue(session, "A150", False) + self.create_queue(session, "B150", False) + self.create_queue(session, "C150", False) + + # redirect + self.redirect("A150", "B150", True, "Should allow redirect") + + self.redirect("A150", "C150", False, "A is already redirected") + + alice = BrokerAdmin(self.config.broker, "bob", "bob") + alice.delete_queue("B150") #should pass + + self.redirect("A150", "C150", True, "Should allow redirect") + session.close() + +############################################################################################## +class BrokerAdmin: + def __init__(self, broker, username=None, password=None): + self.connection = qpid.messaging.Connection(broker) + if username: + self.connection.username = username + self.connection.password = password + self.connection.sasl_mechanisms = "PLAIN" + self.connection.open() + self.session = self.connection.session() + self.sender = self.session.sender("qmf.default.direct/broker") + self.reply_to = "responses-#; {create:always}" + self.receiver = self.session.receiver(self.reply_to) + + def invoke(self, method, arguments): + content = { + "_object_id": {"_object_name": "org.apache.qpid.broker:broker:amqp-broker"}, + "_method_name": method, + "_arguments": arguments + } + request = qpid.messaging.Message(reply_to=self.reply_to, content=content) + request.properties["x-amqp-0-10.app-id"] = "qmf2" + request.properties["qmf.opcode"] = "_method_request" + self.sender.send(request) + response = self.receiver.fetch() + self.session.acknowledge() + if response.properties['x-amqp-0-10.app-id'] == 'qmf2': + if response.properties['qmf.opcode'] == '_method_response': + return response.content['_arguments'] + elif response.properties['qmf.opcode'] == '_exception': + raise Exception(response.content['_values']) + else: raise Exception("Invalid response received, unexpected opcode: %s" % response.properties['qmf.opcode']) + else: raise Exception("Invalid response received, not a qmfv2 method: %s" % response.properties['x-amqp-0-10.app-id']) + + def create_exchange(self, name, exchange_type=None, options={}): + properties = options + if exchange_type: properties["exchange_type"] = exchange_type + self.invoke("create", {"type": "exchange", "name":name, "properties":properties}) + + def create_queue(self, name, properties={}): + self.invoke("create", {"type": "queue", "name":name, "properties":properties}) + + def delete_exchange(self, name): + self.invoke("delete", {"type": "exchange", "name":name}) + + def delete_queue(self, name): + self.invoke("delete", {"type": "queue", "name":name}) |