#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys import os from qpid.testlib import TestBase010 from qpid.datatypes import Message from qpid.queue import Empty from time import sleep class CliTests(TestBase010): def remote_host(self): return self.defines.get("remote-host", "localhost") def remote_port(self): return int(self.defines["remote-port"]) def cli_dir(self): return self.defines["cli-dir"] def makeQueue(self, qname, arguments): ret = os.system(self.command(" add queue " + qname + " " + arguments)) self.assertEqual(ret, 0) queues = self.qmf.getObjects(_class="queue") for queue in queues: if queue.name == qname: return queue assert False def test_queue_params(self): self.startQmf() queue1 = self.makeQueue("test_queue_params1", "--limit-policy none") queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject") queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk") queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring") queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict") LIMIT = "qpid.policy_type" assert LIMIT not in queue1.arguments self.assertEqual(queue2.arguments[LIMIT], "reject") self.assertEqual(queue3.arguments[LIMIT], "flow_to_disk") self.assertEqual(queue4.arguments[LIMIT], "ring") self.assertEqual(queue5.arguments[LIMIT], "ring_strict") queue6 = self.makeQueue("test_queue_params6", "--order fifo") queue7 = self.makeQueue("test_queue_params7", "--order lvq") queue8 = self.makeQueue("test_queue_params8", "--order lvq-no-browse") LVQ = "qpid.last_value_queue" LVQNB = "qpid.last_value_queue_no_browse" assert LVQ not in queue6.arguments assert LVQ in queue7.arguments assert LVQ not in queue8.arguments assert LVQNB not in queue6.arguments assert LVQNB not in queue7.arguments assert LVQNB in queue8.arguments def test_qpid_config(self): self.startQmf(); qmf = self.qmf qname = "test_qpid_config" ret = os.system(self.command(" add queue " + qname)) self.assertEqual(ret, 0) queues = qmf.getObjects(_class="queue") found = False for queue in queues: if queue.name == qname: self.assertEqual(queue.durable, False) found = True self.assertEqual(found, True) ret = os.system(self.command(" del queue " + qname)) self.assertEqual(ret, 0) queues = qmf.getObjects(_class="queue") found = False for queue in queues: if queue.name == qname: found = True self.assertEqual(found, False) def test_qpid_config_durable(self): self.startQmf(); qmf = self.qmf qname = "test_qpid_config" ret = os.system(self.command(" add queue --durable " + qname)) self.assertEqual(ret, 0) queues = qmf.getObjects(_class="queue") found = False for queue in queues: if queue.name == qname: self.assertEqual(queue.durable, True) found = True self.assertEqual(found, True) ret = os.system(self.command(" del queue " + qname)) self.assertEqual(ret, 0) queues = qmf.getObjects(_class="queue") found = False for queue in queues: if queue.name == qname: found = True self.assertEqual(found, False) def test_qpid_config_altex(self): self.startQmf(); qmf = self.qmf exName = "testalt" qName = "testqalt" altName = "amq.direct" ret = os.system(self.command(" add exchange topic %s --alternate-exchange=%s" % (exName, altName))) self.assertEqual(ret, 0) exchanges = qmf.getObjects(_class="exchange") found = False for exchange in exchanges: if exchange.name == altName: self.assertEqual(exchange.altExchange, None) if exchange.name == exName: found = True if not exchange.altExchange: self.fail("Alternate exchange not set") self.assertEqual(exchange._altExchange_.name, altName) self.assertEqual(found, True) ret = os.system(self.command(" add queue %s --alternate-exchange=%s" % (qName, altName))) self.assertEqual(ret, 0) queues = qmf.getObjects(_class="queue") found = False for queue in queues: if queue.name == qName: found = True if not queue.altExchange: self.fail("Alternate exchange not set") self.assertEqual(queue._altExchange_.name, altName) self.assertEqual(found, True) def test_qpid_route(self): self.startQmf(); qmf = self.qmf command = self.cli_dir() + "/qpid-route dynamic add guest/guest@localhost:%d %s:%d amq.topic" %\ (self.broker.port, self.remote_host(), self.remote_port()) ret = os.system(command) self.assertEqual(ret, 0) links = qmf.getObjects(_class="link") found = False for link in links: if link.port == self.remote_port(): found = True self.assertEqual(found, True) def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) return None def getAppHeader(self, msg, name): headers = self.getProperty(msg, "application_headers") if headers: return headers[name] return None def command(self, arg = ""): return self.cli_dir() + "/qpid-config -a localhost:%d" % self.broker.port + " " + arg