diff options
Diffstat (limited to 'qpid/cpp/src/tests/cli_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cli_tests.py | 475 |
1 files changed, 475 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/cli_tests.py b/qpid/cpp/src/tests/cli_tests.py new file mode 100755 index 0000000000..6c75927461 --- /dev/null +++ b/qpid/cpp/src/tests/cli_tests.py @@ -0,0 +1,475 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import os +import imp +from qpid.testlib import TestBase010 +# from brokertest import import_script, checkenv +from qpid.datatypes import Message +from qpid.queue import Empty +from time import sleep + +def import_script(path): + """ + Import executable script at path as a module. + Requires some trickery as scripts are not in standard module format + """ + f = open(path) + try: + name=os.path.split(path)[1].replace("-","_") + return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE)) + finally: f.close() + +def checkenv(name): + value = os.getenv(name) + if not value: raise Exception("Environment variable %s is not set" % name) + return value + +class CliTests(TestBase010): + + def remote_host(self): + return self.defines.get("remote-host", "localhost") + + def remote_port(self): + return int(self.defines["remote-port"]) + + def cli_dir(self): + return self.defines["cli-dir"] + + def makeQueue(self, qname, arguments, api=False): + if api: + ret = self.qpid_config_api(" add queue " + qname + " " + arguments) + else: + ret = os.system(self.qpid_config_command(" add queue " + qname + " " + arguments)) + + self.assertEqual(ret, 0) + queues = self.qmf.getObjects(_class="queue") + for queue in queues: + if queue.name == qname: + return queue + assert False + + def test_queue_params(self): + self.startQmf() + queue1 = self.makeQueue("test_queue_params1", "--limit-policy none") + queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject") + queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk") + queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring") + queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict") + + LIMIT = "qpid.policy_type" + assert LIMIT not in queue1.arguments + self.assertEqual(queue2.arguments[LIMIT], "reject") + self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk") + self.assertEqual(queue4.arguments[LIMIT], "ring") + self.assertEqual(queue5.arguments[LIMIT], "ring_strict") + + queue6 = self.makeQueue("test_queue_params6", "--order fifo") + queue7 = self.makeQueue("test_queue_params7", "--order lvq") + queue8 = self.makeQueue("test_queue_params8", "--order lvq-no-browse") + + LVQ = "qpid.last_value_queue" + LVQNB = "qpid.last_value_queue_no_browse" + + assert LVQ not in queue6.arguments + assert LVQ in queue7.arguments + assert LVQ not in queue8.arguments + + assert LVQNB not in queue6.arguments + assert LVQNB not in queue7.arguments + assert LVQNB in queue8.arguments + + + def test_queue_params_api(self): + self.startQmf() + queue1 = self.makeQueue("test_queue_params1", "--limit-policy none", True) + queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject", True) + queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk", True) + queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring", True) + queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict", True) + + LIMIT = "qpid.policy_type" + assert LIMIT not in queue1.arguments + self.assertEqual(queue2.arguments[LIMIT], "reject") + self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk") + self.assertEqual(queue4.arguments[LIMIT], "ring") + self.assertEqual(queue5.arguments[LIMIT], "ring_strict") + + queue6 = self.makeQueue("test_queue_params6", "--order fifo", True) + queue7 = self.makeQueue("test_queue_params7", "--order lvq", True) + queue8 = self.makeQueue("test_queue_params8", "--order lvq-no-browse", True) + + LVQ = "qpid.last_value_queue" + LVQNB = "qpid.last_value_queue_no_browse" + + assert LVQ not in queue6.arguments + assert LVQ in queue7.arguments + assert LVQ not in queue8.arguments + + assert LVQNB not in queue6.arguments + assert LVQNB not in queue7.arguments + assert LVQNB in queue8.arguments + + + def test_qpid_config(self): + self.startQmf(); + qmf = self.qmf + qname = "test_qpid_config" + + ret = os.system(self.qpid_config_command(" add queue " + qname)) + self.assertEqual(ret, 0) + queues = qmf.getObjects(_class="queue") + found = False + for queue in queues: + if queue.name == qname: + self.assertEqual(queue.durable, False) + found = True + self.assertEqual(found, True) + + ret = os.system(self.qpid_config_command(" del queue " + qname)) + self.assertEqual(ret, 0) + queues = qmf.getObjects(_class="queue") + found = False + for queue in queues: + if queue.name == qname: + found = True + self.assertEqual(found, False) + + def test_qpid_config_api(self): + self.startQmf(); + qmf = self.qmf + qname = "test_qpid_config_api" + + ret = self.qpid_config_api(" add queue " + qname) + self.assertEqual(ret, 0) + queues = qmf.getObjects(_class="queue") + found = False + for queue in queues: + if queue.name == qname: + self.assertEqual(queue.durable, False) + found = True + self.assertEqual(found, True) + + ret = self.qpid_config_api(" del queue " + qname) + self.assertEqual(ret, 0) + queues = qmf.getObjects(_class="queue") + found = False + for queue in queues: + if queue.name == qname: + found = True + self.assertEqual(found, False) + + + def test_qpid_config_sasl_plain_expect_succeed(self): + self.startQmf(); + qmf = self.qmf + qname = "test_qpid_config_sasl_plain_expect_succeed" + cmd = " --sasl-mechanism PLAIN -a guest/guest@localhost:"+str(self.broker.port) + " add queue " + qname + ret = self.qpid_config_api(cmd) + self.assertEqual(ret, 0) + + def test_qpid_config_sasl_plain_expect_fail(self): + """Fails because no user name and password is supplied""" + self.startQmf(); + qmf = self.qmf + qname = "test_qpid_config_sasl_plain_expect_succeed" + cmd = " --sasl-mechanism PLAIN -a localhost:"+str(self.broker.port) + " add queue " + qname + ret = self.qpid_config_api(cmd) + assert ret != 0 + + # helpers for some of the test methods + def helper_find_exchange(self, xchgname, typ, expected=True): + xchgs = self.qmf.getObjects(_class = "exchange") + found = False + for xchg in xchgs: + if xchg.name == xchgname: + if typ: + self.assertEqual(xchg.type, typ) + found = True + self.assertEqual(found, expected) + + def helper_create_exchange(self, xchgname, typ="direct", opts=""): + foo = self.qpid_config_command(opts + " add exchange " + typ + " " + xchgname) + # print foo + ret = os.system(foo) + self.assertEqual(ret, 0) + self.helper_find_exchange(xchgname, typ, True) + + def helper_destroy_exchange(self, xchgname): + foo = self.qpid_config_command(" del exchange " + xchgname) + # print foo + ret = os.system(foo) + self.assertEqual(ret, 0) + self.helper_find_exchange(xchgname, False, expected=False) + + def helper_find_queue(self, qname, expected=True): + queues = self.qmf.getObjects(_class="queue") + found = False + for queue in queues: + if queue.name == qname: + self.assertEqual(queue.durable, False) + found = True + self.assertEqual(found, expected) + + def helper_create_queue(self, qname): + foo = self.qpid_config_command(" add queue " + qname) + # print foo + ret = os.system(foo) + self.assertEqual(ret, 0) + self.helper_find_queue(qname, True) + + def helper_destroy_queue(self, qname): + foo = self.qpid_config_command(" del queue " + qname) + # print foo + ret = os.system(foo) + self.assertEqual(ret, 0) + self.helper_find_queue(qname, False) + + + # test the bind-queue-to-header-exchange functionality + def test_qpid_config_headers(self): + self.startQmf(); + qmf = self.qmf + qname = "test_qpid_config" + xchgname = "test_xchg" + + # first create a header xchg + self.helper_create_exchange(xchgname, typ="headers") + + # create the queue + self.helper_create_queue(qname) + + # now bind the queue to the xchg + foo = self.qpid_config_command(" bind " + xchgname + " " + qname + + " key all foo=bar baz=quux") + # print foo + ret = os.system(foo) + self.assertEqual(ret, 0) + + # he likes it, mikey. Ok, now tear it all down. first the binding + ret = os.system(self.qpid_config_command(" unbind " + xchgname + " " + qname + + " key")) + self.assertEqual(ret, 0) + + # then the queue + self.helper_destroy_queue(qname) + + # then the exchange + self.helper_destroy_exchange(xchgname) + + + def test_qpid_config_xml(self): + self.startQmf(); + qmf = self.qmf + qname = "test_qpid_config" + xchgname = "test_xchg" + + # first create a header xchg + self.helper_create_exchange(xchgname, typ="xml") + + # create the queue + self.helper_create_queue(qname) + + # now bind the queue to the xchg + foo = self.qpid_config_command("-f test.xquery bind " + xchgname + " " + qname) + # print foo + ret = os.system(foo) + self.assertEqual(ret, 0) + + # he likes it, mikey. Ok, now tear it all down. first the binding + ret = os.system(self.qpid_config_command(" unbind " + xchgname + " " + qname + + " key")) + self.assertEqual(ret, 0) + + # then the queue + self.helper_destroy_queue(qname) + + # then the exchange + self.helper_destroy_exchange(xchgname) + + def test_qpid_config_durable(self): + self.startQmf(); + qmf = self.qmf + qname = "test_qpid_config" + + ret = os.system(self.qpid_config_command(" add queue --durable " + qname)) + self.assertEqual(ret, 0) + queues = qmf.getObjects(_class="queue") + found = False + for queue in queues: + if queue.name == qname: + self.assertEqual(queue.durable, True) + found = True + self.assertEqual(found, True) + + ret = os.system(self.qpid_config_command(" del queue " + qname)) + self.assertEqual(ret, 0) + queues = qmf.getObjects(_class="queue") + found = False + for queue in queues: + if queue.name == qname: + found = True + self.assertEqual(found, False) + + def test_qpid_config_altex(self): + self.startQmf(); + qmf = self.qmf + exName = "testalt" + qName = "testqalt" + altName = "amq.direct" + + ret = os.system(self.qpid_config_command(" add exchange topic %s --alternate-exchange=%s" % (exName, altName))) + self.assertEqual(ret, 0) + + exchanges = qmf.getObjects(_class="exchange") + found = False + for exchange in exchanges: + if exchange.name == altName: + self.assertEqual(exchange.altExchange, None) + + if exchange.name == exName: + found = True + if not exchange.altExchange: + self.fail("Alternate exchange not set") + self.assertEqual(exchange._altExchange_.name, altName) + self.assertEqual(found, True) + + ret = os.system(self.qpid_config_command(" add queue %s --alternate-exchange=%s" % (qName, altName))) + self.assertEqual(ret, 0) + + queues = qmf.getObjects(_class="queue") + found = False + for queue in queues: + if queue.name == qName: + found = True + if not queue.altExchange: + self.fail("Alternate exchange not set") + self.assertEqual(queue._altExchange_.name, altName) + self.assertEqual(found, True) + + def test_qpid_config_list_queues_arguments(self): + """ + Test to verify that when the type of a policy limit is + actually a string (though still a valid value), it does not + upset qpid-config + """ + self.startQmf(); + qmf = self.qmf + + names = ["queue_capacity%s" % (i) for i in range(1, 6)] + for name in names: + self.session.queue_declare(queue=name, exclusive=True, + arguments={'qpid.max_count' : str(i), 'qpid.max_size': '100'}) + + output = os.popen(self.qpid_config_command(" queues")).readlines() + queues = [line.split()[0] for line in output[2:len(output)]] #ignore first two lines (header) + + for name in names: + assert name in queues, "%s not in %s" % (name, queues) + + def test_qpid_route(self): + self.startQmf(); + qmf = self.qmf + + command = self.cli_dir() + "/qpid-route dynamic add guest/guest@localhost:%d %s:%d amq.topic" %\ + (self.broker.port, self.remote_host(), self.remote_port()) + ret = os.system(command) + self.assertEqual(ret, 0) + + links = qmf.getObjects(_class="link") + found = False + for link in links: + if link.port == self.remote_port(): + found = True + self.assertEqual(found, True) + + def test_qpid_route_api(self): + self.startQmf(); + qmf = self.qmf + + ret = self.qpid_route_api("dynamic add " + + "guest/guest@localhost:"+str(self.broker.port) + " " + + str(self.remote_host())+":"+str(self.remote_port()) + " " + +"amq.direct") + + self.assertEqual(ret, 0) + + links = qmf.getObjects(_class="link") + found = False + for link in links: + if link.port == self.remote_port(): + found = True + self.assertEqual(found, True) + + + def test_qpid_route_api(self): + self.startQmf(); + qmf = self.qmf + + ret = self.qpid_route_api("dynamic add " + + " --client-sasl-mechanism PLAIN " + + "guest/guest@localhost:"+str(self.broker.port) + " " + + str(self.remote_host())+":"+str(self.remote_port()) + " " + +"amq.direct") + + self.assertEqual(ret, 0) + + links = qmf.getObjects(_class="link") + found = False + for link in links: + if link.port == self.remote_port(): + found = True + self.assertEqual(found, True) + + def test_qpid_route_api_expect_fail(self): + self.startQmf(); + qmf = self.qmf + + ret = self.qpid_route_api("dynamic add " + + " --client-sasl-mechanism PLAIN " + + "localhost:"+str(self.broker.port) + " " + + str(self.remote_host())+":"+str(self.remote_port()) + " " + +"amq.direct") + assert ret != 0 + + + def getProperty(self, msg, name): + for h in msg.headers: + if hasattr(h, name): return getattr(h, name) + return None + + def getAppHeader(self, msg, name): + headers = self.getProperty(msg, "application_headers") + if headers: + return headers[name] + return None + + def qpid_config_command(self, arg = ""): + return self.cli_dir() + "/qpid-config -a localhost:%d" % self.broker.port + " " + arg + + def qpid_config_api(self, arg = ""): + script = import_script(checkenv("QPID_CONFIG_EXEC")) + broker = ["-a", "localhost:"+str(self.broker.port)] + return script.main(broker + arg.split()) + + def qpid_route_api(self, arg = ""): + script = import_script(checkenv("QPID_ROUTE_EXEC")) + return script.main(arg.split()) |