diff options
author | Ted Ross <tross@apache.org> | 2012-02-22 16:41:55 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2012-02-22 16:41:55 +0000 |
commit | 3314a5cb4d14e94ed8fa29a1ba6348d10d27fdcf (patch) | |
tree | 52ddc4ee1c0424de91ded89b1cc27884cfa58134 | |
parent | 99104dde3f0718b4f9a3aff3427bab6a818fb41b (diff) | |
download | qpid-python-3314a5cb4d14e94ed8fa29a1ba6348d10d27fdcf.tar.gz |
QPID-3851 - Unified common CLI options for qpid-config and qpid-stat.
Also in this commit: qpid-config was converted to use the messaging-based qmf2
library. It no longer has a dependency on the qmf library. The CLI tests were also
ported to the faster library.
CLI test time prior to this commit: 2 minutes 12 seconds
CLI test time after this commit: 12.5 seconds
Other items in qpid-config and qpid-stat:
- The deprecated LVQ options (lqv, lqv-no-browse) were removed from qpid-config.
- A new option, --lvq-key, was added to qpid-config to support the new LVQ configuration.
The docs and tests were updated to match.
- qpid-stat was updated so that 'qpid-stat -q <queue-name>' prints full details from the
specified queue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1292388 13f79535-47bb-0310-9956-ffa450edef68
21 files changed, 362 insertions, 289 deletions
diff --git a/qpid/cpp/bindings/qmf/tests/run_interop_tests b/qpid/cpp/bindings/qmf/tests/run_interop_tests index 83e7f2593b..c370f211af 100755 --- a/qpid/cpp/bindings/qmf/tests/run_interop_tests +++ b/qpid/cpp/bindings/qmf/tests/run_interop_tests @@ -24,6 +24,7 @@ MY_DIR=`dirname \`which $0\`` QPID_DIR=${MY_DIR}/../../../.. BUILD_DIR=../../.. PYTHON_DIR=${QPID_DIR}/python +TOOLS_PY_DIR=${QPID_DIR}/tools/src/py QMF_DIR=${QPID_DIR}/extras/qmf QMF_DIR_PY=${QMF_DIR}/src/py BROKER_DIR=${BUILD_DIR}/src @@ -68,7 +69,7 @@ TESTS_FAILED=0 if test -d ${PYTHON_DIR} ; then start_broker echo "Running qmf interop tests using broker on port $BROKER_PORT" - PYTHONPATH=${PYTHON_DIR}:${QMF_DIR_PY}:${MY_DIR} + PYTHONPATH=${PYTHON_DIR}:${QMF_DIR_PY}:${MY_DIR}:${TOOLS_PY_DIR} export PYTHONPATH if test -d ${PYTHON_LIB_DIR} ; then diff --git a/qpid/cpp/src/tests/cli_tests.py b/qpid/cpp/src/tests/cli_tests.py index 6c75927461..b9a7dda15c 100755 --- a/qpid/cpp/src/tests/cli_tests.py +++ b/qpid/cpp/src/tests/cli_tests.py @@ -22,7 +22,6 @@ import sys import os import imp from qpid.testlib import TestBase010 -# from brokertest import import_script, checkenv from qpid.datatypes import Message from qpid.queue import Empty from time import sleep @@ -61,14 +60,13 @@ class CliTests(TestBase010): ret = os.system(self.qpid_config_command(" add queue " + qname + " " + arguments)) self.assertEqual(ret, 0) - queues = self.qmf.getObjects(_class="queue") - for queue in queues: - if queue.name == qname: - return queue + queue = self.broker_access.getQueue(qname) + if queue: + return queue assert False def test_queue_params(self): - self.startQmf() + self.startBrokerAccess() queue1 = self.makeQueue("test_queue_params1", "--limit-policy none") queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject") queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk") @@ -82,29 +80,21 @@ class CliTests(TestBase010): self.assertEqual(queue4.arguments[LIMIT], "ring") self.assertEqual(queue5.arguments[LIMIT], "ring_strict") - queue6 = self.makeQueue("test_queue_params6", "--order fifo") - queue7 = self.makeQueue("test_queue_params7", "--order lvq") - queue8 = self.makeQueue("test_queue_params8", "--order lvq-no-browse") - - LVQ = "qpid.last_value_queue" - LVQNB = "qpid.last_value_queue_no_browse" + queue6 = self.makeQueue("test_queue_params6", "--lvq-key lkey") - assert LVQ not in queue6.arguments - assert LVQ in queue7.arguments - assert LVQ not in queue8.arguments - - assert LVQNB not in queue6.arguments - assert LVQNB not in queue7.arguments - assert LVQNB in queue8.arguments + LVQKEY = "qpid.last_value_queue_key" + assert LVQKEY not in queue5.arguments + assert LVQKEY in queue6.arguments + assert queue6.arguments[LVQKEY] == "lkey" def test_queue_params_api(self): - self.startQmf() - queue1 = self.makeQueue("test_queue_params1", "--limit-policy none", True) - queue2 = self.makeQueue("test_queue_params2", "--limit-policy reject", True) - queue3 = self.makeQueue("test_queue_params3", "--limit-policy flow-to-disk", True) - queue4 = self.makeQueue("test_queue_params4", "--limit-policy ring", True) - queue5 = self.makeQueue("test_queue_params5", "--limit-policy ring-strict", True) + self.startBrokerAccess() + queue1 = self.makeQueue("test_queue_params_api1", "--limit-policy none", True) + queue2 = self.makeQueue("test_queue_params_api2", "--limit-policy reject", True) + queue3 = self.makeQueue("test_queue_params_api3", "--limit-policy flow-to-disk", True) + queue4 = self.makeQueue("test_queue_params_api4", "--limit-policy ring", True) + queue5 = self.makeQueue("test_queue_params_api5", "--limit-policy ring-strict", True) LIMIT = "qpid.policy_type" assert LIMIT not in queue1.arguments @@ -113,30 +103,22 @@ class CliTests(TestBase010): self.assertEqual(queue4.arguments[LIMIT], "ring") self.assertEqual(queue5.arguments[LIMIT], "ring_strict") - queue6 = self.makeQueue("test_queue_params6", "--order fifo", True) - queue7 = self.makeQueue("test_queue_params7", "--order lvq", True) - queue8 = self.makeQueue("test_queue_params8", "--order lvq-no-browse", True) - - LVQ = "qpid.last_value_queue" - LVQNB = "qpid.last_value_queue_no_browse" + queue6 = self.makeQueue("test_queue_params_api6", "--lvq-key lkey") - assert LVQ not in queue6.arguments - assert LVQ in queue7.arguments - assert LVQ not in queue8.arguments + LVQKEY = "qpid.last_value_queue_key" - assert LVQNB not in queue6.arguments - assert LVQNB not in queue7.arguments - assert LVQNB in queue8.arguments + assert LVQKEY not in queue5.arguments + assert LVQKEY in queue6.arguments + assert queue6.arguments[LVQKEY] == "lkey" def test_qpid_config(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); qname = "test_qpid_config" ret = os.system(self.qpid_config_command(" add queue " + qname)) self.assertEqual(ret, 0) - queues = qmf.getObjects(_class="queue") + queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: @@ -146,7 +128,7 @@ class CliTests(TestBase010): ret = os.system(self.qpid_config_command(" del queue " + qname)) self.assertEqual(ret, 0) - queues = qmf.getObjects(_class="queue") + queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: @@ -154,13 +136,12 @@ class CliTests(TestBase010): self.assertEqual(found, False) def test_qpid_config_api(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); qname = "test_qpid_config_api" ret = self.qpid_config_api(" add queue " + qname) self.assertEqual(ret, 0) - queues = qmf.getObjects(_class="queue") + queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: @@ -170,7 +151,7 @@ class CliTests(TestBase010): ret = self.qpid_config_api(" del queue " + qname) self.assertEqual(ret, 0) - queues = qmf.getObjects(_class="queue") + queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: @@ -179,25 +160,23 @@ class CliTests(TestBase010): def test_qpid_config_sasl_plain_expect_succeed(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); qname = "test_qpid_config_sasl_plain_expect_succeed" - cmd = " --sasl-mechanism PLAIN -a guest/guest@localhost:"+str(self.broker.port) + " add queue " + qname + cmd = " --sasl-mechanism PLAIN -b guest/guest@localhost:"+str(self.broker.port) + " add queue " + qname ret = self.qpid_config_api(cmd) self.assertEqual(ret, 0) def test_qpid_config_sasl_plain_expect_fail(self): """Fails because no user name and password is supplied""" - self.startQmf(); - qmf = self.qmf - qname = "test_qpid_config_sasl_plain_expect_succeed" - cmd = " --sasl-mechanism PLAIN -a localhost:"+str(self.broker.port) + " add queue " + qname + self.startBrokerAccess(); + qname = "test_qpid_config_sasl_plain_expect_fail" + cmd = " --sasl-mechanism PLAIN -b localhost:"+str(self.broker.port) + " add queue " + qname ret = self.qpid_config_api(cmd) assert ret != 0 # helpers for some of the test methods def helper_find_exchange(self, xchgname, typ, expected=True): - xchgs = self.qmf.getObjects(_class = "exchange") + xchgs = self.broker_access.getAllExchanges() found = False for xchg in xchgs: if xchg.name == xchgname: @@ -221,7 +200,7 @@ class CliTests(TestBase010): self.helper_find_exchange(xchgname, False, expected=False) def helper_find_queue(self, qname, expected=True): - queues = self.qmf.getObjects(_class="queue") + queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: @@ -246,8 +225,7 @@ class CliTests(TestBase010): # test the bind-queue-to-header-exchange functionality def test_qpid_config_headers(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); qname = "test_qpid_config" xchgname = "test_xchg" @@ -277,8 +255,7 @@ class CliTests(TestBase010): def test_qpid_config_xml(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); qname = "test_qpid_config" xchgname = "test_xchg" @@ -306,13 +283,12 @@ class CliTests(TestBase010): self.helper_destroy_exchange(xchgname) def test_qpid_config_durable(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); qname = "test_qpid_config" ret = os.system(self.qpid_config_command(" add queue --durable " + qname)) self.assertEqual(ret, 0) - queues = qmf.getObjects(_class="queue") + queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: @@ -322,7 +298,7 @@ class CliTests(TestBase010): ret = os.system(self.qpid_config_command(" del queue " + qname)) self.assertEqual(ret, 0) - queues = qmf.getObjects(_class="queue") + queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qname: @@ -330,8 +306,7 @@ class CliTests(TestBase010): self.assertEqual(found, False) def test_qpid_config_altex(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); exName = "testalt" qName = "testqalt" altName = "amq.direct" @@ -339,7 +314,7 @@ class CliTests(TestBase010): ret = os.system(self.qpid_config_command(" add exchange topic %s --alternate-exchange=%s" % (exName, altName))) self.assertEqual(ret, 0) - exchanges = qmf.getObjects(_class="exchange") + exchanges = self.broker_access.getAllExchanges() found = False for exchange in exchanges: if exchange.name == altName: @@ -349,20 +324,20 @@ class CliTests(TestBase010): found = True if not exchange.altExchange: self.fail("Alternate exchange not set") - self.assertEqual(exchange._altExchange_.name, altName) + self.assertEqual(exchange.altExchange, altName) self.assertEqual(found, True) ret = os.system(self.qpid_config_command(" add queue %s --alternate-exchange=%s" % (qName, altName))) self.assertEqual(ret, 0) - queues = qmf.getObjects(_class="queue") + queues = self.broker_access.getAllQueues() found = False for queue in queues: if queue.name == qName: found = True if not queue.altExchange: self.fail("Alternate exchange not set") - self.assertEqual(queue._altExchange_.name, altName) + self.assertEqual(queue.altExchange, altName) self.assertEqual(found, True) def test_qpid_config_list_queues_arguments(self): @@ -371,8 +346,7 @@ class CliTests(TestBase010): actually a string (though still a valid value), it does not upset qpid-config """ - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); names = ["queue_capacity%s" % (i) for i in range(1, 6)] for name in names: @@ -386,15 +360,14 @@ class CliTests(TestBase010): assert name in queues, "%s not in %s" % (name, queues) def test_qpid_route(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); command = self.cli_dir() + "/qpid-route dynamic add guest/guest@localhost:%d %s:%d amq.topic" %\ (self.broker.port, self.remote_host(), self.remote_port()) ret = os.system(command) self.assertEqual(ret, 0) - links = qmf.getObjects(_class="link") + links = self.broker_access.getAllLinks() found = False for link in links: if link.port == self.remote_port(): @@ -402,8 +375,7 @@ class CliTests(TestBase010): self.assertEqual(found, True) def test_qpid_route_api(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); ret = self.qpid_route_api("dynamic add " + "guest/guest@localhost:"+str(self.broker.port) + " " @@ -412,7 +384,7 @@ class CliTests(TestBase010): self.assertEqual(ret, 0) - links = qmf.getObjects(_class="link") + links = self.broker_access.getAllLinks() found = False for link in links: if link.port == self.remote_port(): @@ -421,8 +393,7 @@ class CliTests(TestBase010): def test_qpid_route_api(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); ret = self.qpid_route_api("dynamic add " + " --client-sasl-mechanism PLAIN " @@ -432,7 +403,7 @@ class CliTests(TestBase010): self.assertEqual(ret, 0) - links = qmf.getObjects(_class="link") + links = self.broker_access.getAllLinks() found = False for link in links: if link.port == self.remote_port(): @@ -440,8 +411,7 @@ class CliTests(TestBase010): self.assertEqual(found, True) def test_qpid_route_api_expect_fail(self): - self.startQmf(); - qmf = self.qmf + self.startBrokerAccess(); ret = self.qpid_route_api("dynamic add " + " --client-sasl-mechanism PLAIN " @@ -463,11 +433,11 @@ class CliTests(TestBase010): return None def qpid_config_command(self, arg = ""): - return self.cli_dir() + "/qpid-config -a localhost:%d" % self.broker.port + " " + arg + return self.cli_dir() + "/qpid-config -b localhost:%d" % self.broker.port + " " + arg def qpid_config_api(self, arg = ""): script = import_script(checkenv("QPID_CONFIG_EXEC")) - broker = ["-a", "localhost:"+str(self.broker.port)] + broker = ["-b", "localhost:"+str(self.broker.port)] return script.main(broker + arg.split()) def qpid_route_api(self, arg = ""): diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index cbc3df4a6b..79df21aada 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -277,7 +277,7 @@ acl deny all all QMF-based tools - regression test for BZ615300.""" broker1 = self.cluster(1)[0] broker2 = self.cluster(1)[0] - qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE) + qs = subprocess.Popen(["qpid-stat", "-e", "-b", broker1.host_port()], stdout=subprocess.PIPE) out = qs.communicate()[0] assert out.find("amq.failover") > 0 @@ -1160,7 +1160,7 @@ class LongTests(BrokerTest): def start_mclients(broker): """Start management clients that make multiple connections.""" - cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())] + cmd = ["qpid-cluster", "-C", "localhost:%s" %(broker.port())] mclients.append(ClientLoop(broker, cmd)) endtime = time.time() + self.duration() diff --git a/qpid/cpp/src/tests/clustered_replication_test b/qpid/cpp/src/tests/clustered_replication_test index 4a57502f39..8c8522c2eb 100755 --- a/qpid/cpp/src/tests/clustered_replication_test +++ b/qpid/cpp/src/tests/clustered_replication_test @@ -65,8 +65,8 @@ if test -d $PYTHON_DIR; then #start first node of primary cluster and set up test queue echo Starting primary cluster PRIMARY1=$(with_ais_group $QPIDD_EXEC $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.1.tmp) || fail "Could not start PRIMARY1" - $PYTHON_COMMANDS/qpid-config -a "localhost:$PRIMARY1" add queue test-queue --generate-queue-events 2 - $PYTHON_COMMANDS/qpid-config -a "localhost:$PRIMARY1" add queue control-queue --generate-queue-events 1 + $PYTHON_COMMANDS/qpid-config -b "localhost:$PRIMARY1" add queue test-queue --generate-queue-events 2 + $PYTHON_COMMANDS/qpid-config -b "localhost:$PRIMARY1" add queue control-queue --generate-queue-events 1 #send 10 messages, consume 5 of them for i in `seq 1 10`; do echo Message$i; done | ./sender --port $PRIMARY1 @@ -81,9 +81,9 @@ if test -d $PYTHON_DIR; then DR1=$(with_ais_group $QPIDD_EXEC $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.1.tmp) || fail "Could not start DR1" DR2=$(with_ais_group $QPIDD_EXEC $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.2.tmp) || fail "Could not start DR2" - $PYTHON_COMMANDS/qpid-config -a "localhost:$DR1" add queue test-queue - $PYTHON_COMMANDS/qpid-config -a "localhost:$DR1" add queue control-queue - $PYTHON_COMMANDS/qpid-config -a "localhost:$DR1" add exchange replication REPLICATION_EXCHANGE + $PYTHON_COMMANDS/qpid-config -b "localhost:$DR1" add queue test-queue + $PYTHON_COMMANDS/qpid-config -b "localhost:$DR1" add queue control-queue + $PYTHON_COMMANDS/qpid-config -b "localhost:$DR1" add exchange replication REPLICATION_EXCHANGE $PYTHON_COMMANDS/qpid-route queue add localhost:$DR2 localhost:$PRIMARY2 REPLICATION_EXCHANGE REPLICATION_QUEUE || fail "Could not add route." #send more messages to primary diff --git a/qpid/cpp/src/tests/federated_cluster_test b/qpid/cpp/src/tests/federated_cluster_test index b32455259e..50b877e666 100755 --- a/qpid/cpp/src/tests/federated_cluster_test +++ b/qpid/cpp/src/tests/federated_cluster_test @@ -63,20 +63,20 @@ start_brokers() { setup() { #create exchange on both cluster and single broker - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add exchange direct test-exchange - $PYTHON_COMMANDS/qpid-config -a "localhost:$NODE_1" add exchange direct test-exchange + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add exchange direct test-exchange + $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" add exchange direct test-exchange #create dynamic routes for test exchange $PYTHON_COMMANDS/qpid-route dynamic add "localhost:$NODE_2" "localhost:$BROKER_A" test-exchange $PYTHON_COMMANDS/qpid-route dynamic add "localhost:$BROKER_A" "localhost:$NODE_2" test-exchange #create test queue on cluster and bind it to the test exchange - $PYTHON_COMMANDS/qpid-config -a "localhost:$NODE_1" add queue test-queue - $PYTHON_COMMANDS/qpid-config -a "localhost:$NODE_1" bind test-exchange test-queue to-cluster + $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" add queue test-queue + $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" bind test-exchange test-queue to-cluster #create test queue on single broker and bind it to the test exchange - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue test-queue - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" bind test-exchange test-queue from-cluster + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue test-queue + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" bind test-exchange test-queue from-cluster } run_test_pull_to_cluster_two_consumers() { diff --git a/qpid/cpp/src/tests/ipv6_test b/qpid/cpp/src/tests/ipv6_test index d75d50fd0a..8fa272d514 100755 --- a/qpid/cpp/src/tests/ipv6_test +++ b/qpid/cpp/src/tests/ipv6_test @@ -93,10 +93,10 @@ else BROKER1="[::1]:${PORTS[1]}" TEST_QUEUE=ipv6-fed-test - $QPID_CONFIG_EXEC -a $BROKER0 add queue $TEST_QUEUE - $QPID_CONFIG_EXEC -a $BROKER1 add queue $TEST_QUEUE + $QPID_CONFIG_EXEC -b $BROKER0 add queue $TEST_QUEUE + $QPID_CONFIG_EXEC -b $BROKER1 add queue $TEST_QUEUE $QPID_ROUTE_EXEC dynamic add $BROKER1 $BROKER0 amq.direct - $QPID_CONFIG_EXEC -a $BROKER1 bind amq.direct $TEST_QUEUE $TEST_QUEUE + $QPID_CONFIG_EXEC -b $BROKER1 bind amq.direct $TEST_QUEUE $TEST_QUEUE $QPID_ROUTE_EXEC route map $BROKER1 ./datagen --count 100 | tee rdata-in | diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py index dec7cfb3af..d51b26a821 100644 --- a/qpid/cpp/src/tests/queue_flow_limit_tests.py +++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py @@ -117,7 +117,7 @@ class QueueFlowLimitTests(TestBase010): tool = environ.get("QPID_CONFIG_EXEC") if tool: command = tool + \ - " --broker-addr=%s:%s " % (self.broker.host, self.broker.port) \ + " --broker=%s:%s " % (self.broker.host, self.broker.port) \ + "add queue test01 --flow-stop-count=999" \ + " --flow-resume-count=55 --flow-stop-size=5000000" \ + " --flow-resume-size=100000" diff --git a/qpid/cpp/src/tests/reliable_replication_test b/qpid/cpp/src/tests/reliable_replication_test index 1f1dac5f2d..c660f751e5 100755 --- a/qpid/cpp/src/tests/reliable_replication_test +++ b/qpid/cpp/src/tests/reliable_replication_test @@ -47,12 +47,12 @@ setup() { echo "Testing replication from port $BROKER_A to port $BROKER_B" - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add exchange replication replication $PYTHON_COMMANDS/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication #create test queue (only replicate enqueues for this test): - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 1 - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add queue queue-a + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue queue-a --generate-queue-events 1 + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add queue queue-a } send() { diff --git a/qpid/cpp/src/tests/replication_test b/qpid/cpp/src/tests/replication_test index 8c37568875..f8b2136396 100755 --- a/qpid/cpp/src/tests/replication_test +++ b/qpid/cpp/src/tests/replication_test @@ -46,21 +46,21 @@ if test -d ${PYTHON_DIR} && test -f "$REPLICATING_LISTENER_LIB" && test -f "$REP BROKER_B=`cat qpidd.port` echo "Running replication test between localhost:$BROKER_A and localhost:$BROKER_B" - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add exchange replication replication $PYTHON_COMMANDS/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication #create test queues - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 2 - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2 - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1 - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 2 - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 1 + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue queue-a --generate-queue-events 2 + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2 + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1 + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue queue-d --generate-queue-events 2 + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue queue-e --generate-queue-events 1 - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add queue queue-a - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add queue queue-b - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add queue queue-c - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add queue queue-e + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add queue queue-a + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add queue queue-b + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add queue queue-c + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add queue queue-e #queue-d deliberately not declared on DR; this error case should be handled #publish and consume from test queues on broker A: @@ -124,13 +124,13 @@ if test -d ${PYTHON_DIR} && test -f "$REPLICATING_LISTENER_LIB" && test -f "$REP $QPIDD_EXEC --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module $REPLICATION_EXCHANGE_LIB --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port BROKER_B=`cat qpidd.port` - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add exchange replication replication $PYTHON_COMMANDS/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 2 - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add queue queue-e - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 1 - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add queue queue-d + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue queue-e --generate-queue-events 2 + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add queue queue-e + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue queue-d --generate-queue-events 1 + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add queue queue-d i=1 while [ $i -le 10 ]; do @@ -152,8 +152,8 @@ if test -d ${PYTHON_DIR} && test -f "$REPLICATING_LISTENER_LIB" && test -f "$REP $QPIDD_EXEC --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module $REPLICATION_EXCHANGE_LIB --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port BROKER_B=`cat qpidd.port` - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add queue queue-e - $PYTHON_COMMANDS/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add queue queue-e + $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_B" add exchange replication replication $PYTHON_COMMANDS/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication # now send another 15 i=11 diff --git a/qpid/cpp/src/tests/ring_queue_test b/qpid/cpp/src/tests/ring_queue_test index 553746eb49..271b46183e 100755 --- a/qpid/cpp/src/tests/ring_queue_test +++ b/qpid/cpp/src/tests/ring_queue_test @@ -28,7 +28,7 @@ MESSAGES=10000 SENDERS=1 RECEIVERS=1 CONCURRENT=0 -BROKER_URL="-a ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" +BROKER_URL="-b ${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" setup() { if [[ $DURABLE -gt 0 ]]; then diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests index 5a6da546f3..4e82759866 100755 --- a/qpid/cpp/src/tests/run_msg_group_tests +++ b/qpid/cpp/src/tests/run_msg_group_tests @@ -44,19 +44,19 @@ run_test() { declare -i i=0 declare -a tests -tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups" +tests=("qpid-config -b $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size" - "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups" + "qpid-config -b $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size" "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size" - "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force" + "qpid-config -b $BROKER_URL del queue ${QUEUE_NAME}-two --force" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10000 --group-size 1 --receivers 0 --senders 1" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10000 --receivers 5 --senders 0" - "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") + "qpid-config -b $BROKER_URL del queue $QUEUE_NAME --force") while [ -n "${tests[i]}" ]; do run_test ${tests[i]} diff --git a/qpid/cpp/src/tests/run_msg_group_tests_soak b/qpid/cpp/src/tests/run_msg_group_tests_soak index 44995423cc..2ebbaf4efd 100755 --- a/qpid/cpp/src/tests/run_msg_group_tests_soak +++ b/qpid/cpp/src/tests/run_msg_group_tests_soak @@ -44,13 +44,13 @@ run_test() { declare -i i=0 declare -a tests -tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups" +tests=("qpid-config -b $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 40000 --receivers 0 --senders 5 --group-size 13 --randomize-group-size" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 200000 --receivers 3 --senders 0 --capacity 23 --ack-frequency 7" - "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") + "qpid-config -b $BROKER_URL del queue $QUEUE_NAME --force") while [ -n "${tests[i]}" ]; do run_test ${tests[i]} diff --git a/qpid/cpp/src/tests/sasl_fed b/qpid/cpp/src/tests/sasl_fed index 884c44177c..9dc2dd46e2 100755 --- a/qpid/cpp/src/tests/sasl_fed +++ b/qpid/cpp/src/tests/sasl_fed @@ -90,23 +90,23 @@ EXCHANGE_NAME=sasl_fedex #-------------------------------------------------- #echo " add exchanges" #-------------------------------------------------- -$QPID_CONFIG_EXEC -a localhost:$broker_1_port add exchange direct $EXCHANGE_NAME -$QPID_CONFIG_EXEC -a localhost:$broker_2_port add exchange direct $EXCHANGE_NAME +$QPID_CONFIG_EXEC -b localhost:$broker_1_port add exchange direct $EXCHANGE_NAME +$QPID_CONFIG_EXEC -b localhost:$broker_2_port add exchange direct $EXCHANGE_NAME #-------------------------------------------------- #echo " add queues" #-------------------------------------------------- -$QPID_CONFIG_EXEC -a localhost:$broker_1_port add queue $QUEUE_NAME -$QPID_CONFIG_EXEC -a localhost:$broker_2_port add queue $QUEUE_NAME +$QPID_CONFIG_EXEC -b localhost:$broker_1_port add queue $QUEUE_NAME +$QPID_CONFIG_EXEC -b localhost:$broker_2_port add queue $QUEUE_NAME sleep 5 #-------------------------------------------------- #echo " create bindings" #-------------------------------------------------- -$QPID_CONFIG_EXEC -a localhost:$broker_1_port bind $EXCHANGE_NAME $QUEUE_NAME $ROUTING_KEY -$QPID_CONFIG_EXEC -a localhost:$broker_2_port bind $EXCHANGE_NAME $QUEUE_NAME $ROUTING_KEY +$QPID_CONFIG_EXEC -b localhost:$broker_1_port bind $EXCHANGE_NAME $QUEUE_NAME $ROUTING_KEY +$QPID_CONFIG_EXEC -b localhost:$broker_2_port bind $EXCHANGE_NAME $QUEUE_NAME $ROUTING_KEY sleep 5 @@ -130,13 +130,13 @@ sleep 5 #-------------------------------------------------- #echo " Examine Broker $broker_1_port" #-------------------------------------------------- -broker_1_message_count=`$PYTHON_COMMANDS/qpid-stat -q localhost:$broker_1_port | grep sasl_fed_queue | awk '{print $2}'` +broker_1_message_count=`$PYTHON_COMMANDS/qpid-stat -q -b localhost:$broker_1_port | grep sasl_fed_queue | awk '{print $2}'` #echo " " #-------------------------------------------------- #echo " Examine Broker $broker_2_port" #-------------------------------------------------- -broker_2_message_count=`$PYTHON_COMMANDS/qpid-stat -q localhost:$broker_2_port | grep sasl_fed_queue | awk '{print $2}'` +broker_2_message_count=`$PYTHON_COMMANDS/qpid-stat -q -b localhost:$broker_2_port | grep sasl_fed_queue | awk '{print $2}'` #echo " " #-------------------------------------------------- diff --git a/qpid/cpp/src/tests/sasl_fed_ex b/qpid/cpp/src/tests/sasl_fed_ex index 716a806874..cc5b310067 100755 --- a/qpid/cpp/src/tests/sasl_fed_ex +++ b/qpid/cpp/src/tests/sasl_fed_ex @@ -280,18 +280,18 @@ EXCHANGE_NAME=sasl_fedex print "add exchanges" -$QPID_CONFIG_EXEC -a localhost:${SRC_TCP_PORT} add exchange direct $EXCHANGE_NAME -$QPID_CONFIG_EXEC -a localhost:${DST_TCP_PORT} add exchange direct $EXCHANGE_NAME +$QPID_CONFIG_EXEC -b localhost:${SRC_TCP_PORT} add exchange direct $EXCHANGE_NAME +$QPID_CONFIG_EXEC -b localhost:${DST_TCP_PORT} add exchange direct $EXCHANGE_NAME print "add queues" -$QPID_CONFIG_EXEC -a localhost:${SRC_TCP_PORT} add queue $QUEUE_NAME -$QPID_CONFIG_EXEC -a localhost:${DST_TCP_PORT} add queue $QUEUE_NAME +$QPID_CONFIG_EXEC -b localhost:${SRC_TCP_PORT} add queue $QUEUE_NAME +$QPID_CONFIG_EXEC -b localhost:${DST_TCP_PORT} add queue $QUEUE_NAME print "create bindings" -$QPID_CONFIG_EXEC -a localhost:${SRC_TCP_PORT} bind $EXCHANGE_NAME $QUEUE_NAME $ROUTING_KEY -$QPID_CONFIG_EXEC -a localhost:${DST_TCP_PORT} bind $EXCHANGE_NAME $QUEUE_NAME $ROUTING_KEY +$QPID_CONFIG_EXEC -b localhost:${SRC_TCP_PORT} bind $EXCHANGE_NAME $QUEUE_NAME $ROUTING_KEY +$QPID_CONFIG_EXEC -b localhost:${DST_TCP_PORT} bind $EXCHANGE_NAME $QUEUE_NAME $ROUTING_KEY # diff --git a/qpid/cpp/src/tests/testlib.py b/qpid/cpp/src/tests/testlib.py index fe57a84a81..71ad59e5c1 100644 --- a/qpid/cpp/src/tests/testlib.py +++ b/qpid/cpp/src/tests/testlib.py @@ -348,8 +348,8 @@ class TestBaseCluster(TestBase): def _qpidConfig(self, nodeNumber, clusterName, action): """Configure some aspect of a qpid broker using the qpid_config executable""" port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] - #print "%s -a localhost:%d %s" % (self._qpidConfigExec, port, action) - ret = os.spawnl(os.P_WAIT, self._qpidConfigExec, self._qpidConfigExec, "-a", "localhost:%d" % port, *action.split()) + #print "%s -b localhost:%d %s" % (self._qpidConfigExec, port, action) + ret = os.spawnl(os.P_WAIT, self._qpidConfigExec, self._qpidConfigExec, "-b", "localhost:%d" % port, *action.split()) if ret != 0: raise Exception("_qpidConfig(): cluster=\"%s\" nodeNumber=%d port=%d action=\"%s\" returned %d" % \ (clusterName, nodeNumber, port, action, ret)) diff --git a/qpid/doc/book/src/LVQ.xml b/qpid/doc/book/src/LVQ.xml index 4e818881ad..b57c6268be 100644 --- a/qpid/doc/book/src/LVQ.xml +++ b/qpid/doc/book/src/LVQ.xml @@ -117,7 +117,7 @@ qpid-config utility: </para> <programlisting> - $ qpid-config add queue prices --argument qpid.last_value_queue_key=ticker + $ qpid-config add queue prices --lvq-key ticker </programlisting> </section> </section> @@ -131,13 +131,13 @@ from qpid.messaging import Connection, Message def send(sender, key, message): - message.properties["key"] = key + message.properties["ticker"] = key sender.send(message) conn = Connection("localhost") conn.open() sess = conn.session() - tx = sess.sender("topic;{create:always, node:{type:queue,x-declare:{arguments:{'qpid.last_value_queue_key':key}}}}") + tx = sess.sender("prices;{create:always, node:{type:queue,x-declare:{arguments:{'qpid.last_value_queue_key':ticker}}}}") msg = Message("Content") send(tx, "key1", msg); @@ -155,12 +155,11 @@ <title>LVQ Browsing Receiver</title> <programlisting> from qpid.messaging import Connection, Message - from time import sleep conn = Connection("localhost") conn.open() sess = conn.session() - rx = sess.receiver("topic;{mode:browse}") + rx = sess.receiver("prices;{mode:browse}") while True: msg = rx.fetch() @@ -176,7 +175,7 @@ There are two legacy modes (still implemented as of Qpid 0.14) controlled by the qpid.last_value_queue and qpid.last_value_queue_no_browse argument values. These modes are - intended to be deprecated and should not be used. + deprecated and should not be used. </para> </section> </section> diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 1da53b3378..d9feb6b484 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -29,6 +29,9 @@ from qpid.message import Message from qpid.harness import Skipped from qpid.exceptions import VersionError +import qpid.messaging +import qpidtoollibs.broker + class TestBase(unittest.TestCase): """Base class for Qpid test cases. @@ -193,6 +196,15 @@ class TestBase010(unittest.TestCase): self.qmf = qmf.console.Session(handler) self.qmf_broker = self.qmf.addBroker(str(self.broker)) + def startBrokerAccess(self): + """ + New-style management access to the broker. Can be used in lieu of startQmf. + """ + if 'broker_conn' not in self.__dict__: + self.broker_conn = qpid.messaging.Connection(str(self.broker)) + self.broker_conn.open() + self.broker_access = qpidtoollibs.broker.BrokerAgent(self.broker_conn) + def connect(self, host=None, port=None): url = self.broker if url.scheme == URL.AMQPS: diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index 0110c60aa2..9433029590 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -18,12 +18,18 @@ # specific language governing permissions and limitations # under the License. # +import pdb import os from optparse import OptionParser, OptionGroup, IndentedHelpFormatter import sys import locale -from qmf.console import Session + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpid.messaging import Connection +from qpidtoollibs.broker import BrokerAgent usage = """ Usage: qpid-config [OPTIONS] @@ -43,8 +49,8 @@ Examples: $ qpid-config add queue q $ qpid-config add exchange direct d -a localhost:5672 -$ qpid-config exchanges -a 10.1.1.7:10000 -$ qpid-config queues -a guest/guest@broker-host:10000 +$ qpid-config exchanges -b 10.1.1.7:10000 +$ qpid-config queues -b guest/guest@broker-host:10000 Add Exchange <type> values: @@ -61,13 +67,7 @@ Queue Limit Actions reject - Reject enqueued messages flow-to-disk - Page messages to disk ring - Replace oldest unacquired message with new - ring-strict - Replace oldest message, reject if oldest is acquired - -Queue Ordering Policies - - fifo (default) - First in, first out - lvq - Last Value Queue ordering, allows queue browsing - lvq-no-browse - Last Value Queue ordering, browsing clients may lose data""" + ring-strict - Replace oldest message, reject if oldest is acquired""" class Config: @@ -77,7 +77,6 @@ class Config: self._connTimeout = 10 self._ignoreDefault = False self._altern_ex = None - self._passive = False self._durable = False self._clusterDurable = False self._if_empty = True @@ -87,8 +86,8 @@ class Config: self._maxQueueSize = None self._maxQueueCount = None self._limitPolicy = None - self._order = None self._msgSequence = False + self._lvq_key = None self._ive = False self._eventGeneration = None self._file = None @@ -110,8 +109,7 @@ MAX_QUEUE_SIZE = "qpid.max_size" MAX_QUEUE_COUNT = "qpid.max_count" POLICY_TYPE = "qpid.policy_type" CLUSTER_DURABLE = "qpid.persist_last_node" -LVQ = "qpid.last_value_queue" -LVQNB = "qpid.last_value_queue_no_browse" +LVQ_KEY = "qpid.last_value_queue_key" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" @@ -127,7 +125,7 @@ SHARED_MSG_GROUP = "qpid.shared_msg_group" #arguments for which there are specific program options defined #i.e. the arguments for which there is special processing on add and #list -SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, +SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP] class JHelpFormatter(IndentedHelpFormatter): @@ -160,8 +158,8 @@ def OptionsAndArguments(argv): group1 = OptionGroup(parser, "General Options") group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list") - group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") + group1.add_option("-r", "--recursive", action="store_true", help="Show bindings in queue or exchange list") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) @@ -171,7 +169,6 @@ def OptionsAndArguments(argv): group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues") group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.") - group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.") group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.") parser.add_option_group(group2) @@ -182,7 +179,7 @@ def OptionsAndArguments(argv): group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes") group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages") group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached") - group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy") + group3.add_option("--lvq-key", action="store", metavar="<key>", help="Last Value Queue key") group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>", help="Turn on sender flow control when the number of queued bytes exceeds this value.") @@ -224,10 +221,10 @@ def OptionsAndArguments(argv): except: args = encArgs - if opts.bindings: + if opts.recursive: config._recursive = True - if opts.broker_addr: - config._host = opts.broker_addr + if opts.broker: + config._host = opts.broker if opts.timeout is not None: config._connTimeout = opts.timeout if config._connTimeout == 0: @@ -236,8 +233,6 @@ def OptionsAndArguments(argv): config._ignoreDefault = True if opts.alternate_exchange: config._altern_ex = opts.alternate_exchange - if opts.passive: - config._passive = True if opts.durable: config._durable = True if opts.cluster_durable: @@ -254,10 +249,10 @@ def OptionsAndArguments(argv): config._maxQueueCount = opts.max_queue_count if opts.limit_policy: config._limitPolicy = opts.limit_policy - if opts.order: - config._order = opts.order if opts.sequence: config._msgSequence = True + if opts.lvq_key: + config._lvq_key = opts.lvq_key if opts.ive: config._ive = True if opts.generate_queue_events: @@ -331,27 +326,23 @@ def snarf_header_args(args): class BrokerManager: def __init__(self): self.brokerName = None - self.qmf = None + self.conn = None self.broker = None - self.mechanism = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a + self.conn = Connection(self.url, sasl_mechanisms=mechanism) + self.conn.open() + self.broker = BrokerAgent(self.conn) def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) + if self.conn: + self.conn.close() def Overview(self): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - print "Total Exchanges: %d" % len (exchanges) + exchanges = self.broker.getAllExchanges() + queues = self.broker.getAllQueues() + print "Total Exchanges: %d" % len(exchanges) etype = {} for ex in exchanges: if ex.type not in etype: @@ -362,16 +353,16 @@ class BrokerManager: print "%15s: %d" % (typ, etype[typ]) print - print " Total Queues: %d" % len (queues) + print " Total Queues: %d" % len(queues) durable = 0 for queue in queues: if queue.durable: durable = durable + 1 print " durable: %d" % durable - print " non-durable: %d" % (len (queues) - durable) + print " non-durable: %d" % (len(queues) - durable) def ExchangeList(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() caption1 = "Type " caption2 = "Exchange Name" maxNameLen = len(caption2) @@ -401,19 +392,19 @@ class BrokerManager: if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", if IVE in args and args[IVE] == 1: print "--ive", if ex.altExchange: - print "--alternate-exchange=%s" % ex._altExchange_.name, + print "--alternate-exchange=%s" % ex.altExchange, print def ExchangeListRecurse(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() + bindings = self.broker.getAllBindings() + queues = self.broker.getAllQueues() for ex in exchanges: if config._ignoreDefault and not ex.name: continue if self.match(ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.getObjectId(): + if bind.exchangeRef == ex.name: qname = "<unknown>" queue = self.findById(queues, bind.queueRef) if queue != None: @@ -425,7 +416,7 @@ class BrokerManager: def QueueList(self, filter): - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + queues = self.broker.getAllQueues() caption = "Queue Name" maxNameLen = len(caption) found = False @@ -458,8 +449,7 @@ class BrokerManager: if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE], if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), - if LVQ in args and args[LVQ] == 1: print "--order lvq", - if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", + if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY], if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: print "--alternate-exchange=%s" % q._altExchange_.name, @@ -472,14 +462,14 @@ class BrokerManager: print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS]) def QueueListRecurse(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() + bindings = self.broker.getAllBindings() + queues = self.broker.getAllQueues() for queue in queues: if self.match(queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.getObjectId(): + if bind.queueRef == queue.name: ename = "<unknown>" ex = self.findById(exchanges, bind.exchangeRef) if ex != None: @@ -508,16 +498,19 @@ class BrokerManager: declArgs[MSG_SEQUENCE] = 1 if config._ive: declArgs[IVE] = 1 - if config._altern_ex != None: - self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) - else: - self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, passive=config._passive, durable=config._durable, arguments=declArgs) + if config._altern_ex: + declArgs['alternate-exchange'] = config._altern_ex + if config._durable: + declArgs['durable'] = 1 + self.broker.addExchange(etype, ename, declArgs) + def DelExchange(self, args): if len(args) < 1: Usage() ename = args[0] - self.broker.getAmqpSession().exchange_delete(exchange=ename) + self.broker.delExchange(ename) + def AddQueue(self, args): if len(args) < 1: @@ -550,15 +543,10 @@ class BrokerManager: elif config._limitPolicy == "ring-strict": declArgs[POLICY_TYPE] = "ring_strict" - if config._clusterDurable: + if config._clusterDurable: declArgs[CLUSTER_DURABLE] = 1 - if config._order: - if config._order == "fifo": - pass - elif config._order == "lvq": - declArgs[LVQ] = 1 - elif config._order == "lvq-no-browse": - declArgs[LVQNB] = 1 + if config._lvq_key: + declArgs[LVQ_KEY] = config._lvq_key if config._eventGeneration: declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration @@ -576,17 +564,19 @@ class BrokerManager: if config._sharedMsgGroup: declArgs[SHARED_MSG_GROUP] = 1 - if config._altern_ex != None: - self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) - else: - self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs) + if config._altern_ex: + declArgs['alternate-exchange'] = config._altern_ex + if config._durable: + declArgs['durable'] = 1 + + self.broker.addQueue(qname, declArgs) def DelQueue(self, args): if len(args) < 1: Usage() qname = args[0] - self.broker.getAmqpSession().queue_delete(queue=qname, if_empty=config._if_empty, if_unused=config._if_unused) + self.broker.delQueue(qname) def Bind(self, args): @@ -599,7 +589,7 @@ class BrokerManager: key = args[2] # query the exchange to determine its type. - res = self.broker.getAmqpSession().exchange_query(ename) + res = self.broker.getExchange(ename) # type of the xchg determines the processing of the rest of # argv. if it's an xml xchg, we want to find a file @@ -608,7 +598,7 @@ class BrokerManager: # map containing key/value pairs. if neither of those, extra # args are ignored. ok = True - _args = None + _args = {} if res.type == "xml": # this checks/imports the -f arg [ok, xquery] = snarf_xquery_args() @@ -622,10 +612,7 @@ class BrokerManager: if not ok: sys.exit(1) - self.broker.getAmqpSession().exchange_bind(queue=qname, - exchange=ename, - binding_key=key, - arguments=_args) + self.broker.bind(ename, qname, key, _args) def Unbind(self, args): if len(args) < 2: @@ -635,11 +622,11 @@ class BrokerManager: key = "" if len(args) > 2: key = args[2] - self.broker.getAmqpSession().exchange_unbind(queue=qname, exchange=ename, binding_key=key) + self.broker.unbind(ename, qname, key) def findById(self, items, id): for item in items: - if item.getObjectId() == id: + if item.name == id: return item return None diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat index bb094554e6..c9c4da2aeb 100755 --- a/qpid/tools/src/py/qpid-stat +++ b/qpid/tools/src/py/qpid-stat @@ -42,7 +42,6 @@ class Config: self._limit = 50 self._increasing = False self._sortcol = None - self._details = None self._sasl_mechanism = None config = Config() @@ -52,16 +51,19 @@ def OptionsAndArguments(argv): global config - parser = OptionParser(usage="usage: %prog [options] BROKER", - description="Example: $ qpid-stat -q broker-host:10000") + parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]") group1 = OptionGroup(parser, "General Options") - group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>", + help="URL of the broker to query") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", + help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", + help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") - group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show") + group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") @@ -70,24 +72,21 @@ def OptionsAndArguments(argv): group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") - group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.") + parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) if not opts.show: - parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") + parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help") config._types = opts.show config._sortcol = opts.sort_by + config._host = opts.broker config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit config._sasl_mechanism = opts.sasl_mechanism - config._detail = opts.detail - - if args: - config._host = args[0] return args @@ -118,24 +117,23 @@ class IpAddr: class BrokerManager: def __init__(self): - self.brokerName = None - self.connections = [] - self.brokers = [] - self.cluster = None + self.brokerName = None + self.connection = None + self.broker = None + self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.connections.append(Connection(self.url, sasl_mechanism=mechanism)) - self.connections[0].open() - self.brokers.append(BrokerAgent(self.connections[0])) + self.connection = Connection(self.url, sasl_mechanisms=mechanism) + self.connection.open() + self.broker = BrokerAgent(self.connection) def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: - for conn in self.connections: - conn.close() + connection.close() except: pass @@ -184,7 +182,7 @@ class BrokerManager: heads.append(Header('exchanges', Header.COMMAS)) heads.append(Header('queues', Header.COMMAS)) rows = [] - broker = self.brokers[0].getBroker() + broker = self.broker.getBroker() connections = self.getConnectionMap() sessions = self.getSessionMap() exchanges = self.getExchangeMap() @@ -241,8 +239,8 @@ class BrokerManager: heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] - connections = self.brokers[0].getAllConnections() - broker = self.brokers[0].getBroker() + connections = self.broker.getAllConnections() + broker = self.broker.getBroker() for conn in connections: row = [] row.append(conn.address) @@ -279,7 +277,7 @@ class BrokerManager: heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] - exchanges = self.brokers[0].getAllExchanges() + exchanges = self.broker.getAllExchanges() for ex in exchanges: row = [] row.append(ex.name) @@ -317,7 +315,7 @@ class BrokerManager: heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] - queues = self.brokers[0].getAllQueues() + queues = self.broker.getAllQueues() for q in queues: row = [] row.append(q.name) @@ -341,9 +339,65 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueue(self, subs): + + def displayQueue(self, subs, name): + queue = self.broker.getQueue(name) + if not queue: + print "Queue '%s' not found" % name + return + disp = Display(prefix=" ") heads = [] + heads.append(Header('Name')) + heads.append(Header('Durable', Header.YN)) + heads.append(Header('AutoDelete', Header.YN)) + heads.append(Header('Exclusive', Header.YN)) + heads.append(Header('FlowStopped', Header.YN)) + heads.append(Header('FlowStoppedCount', Header.COMMAS)) + heads.append(Header('Consumers', Header.COMMAS)) + heads.append(Header('Bindings', Header.COMMAS)) + rows = [] + rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive, + queue.flowStopped, queue.flowStoppedCount, + queue.consumerCount, queue.bindingCount]) + disp.formattedTable("Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Property')) + heads.append(Header('Value')) + rows = [] + rows.append(['arguments', queue.arguments]) + rows.append(['alt-exchange', queue.altExchange]) + disp.formattedTable("Optional Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', queue.msgDepth, queue.byteDepth]) + rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues]) + rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues]) + rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues]) + rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues]) + rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues]) + rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues]) + rows.append(['acquires', queue.acquires, None]) + rows.append(['releases', queue.releases, None]) + rows.append(['discards-ttl-expired', queue.discardsTtl, None]) + rows.append(['discards-limit-overflow', queue.discardsOverflow, None]) + rows.append(['discards-ring-overflow', queue.discardsRing, None]) + rows.append(['discards-lvq-replace', queue.discardsLvq, None]) + rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None]) + rows.append(['discards-purged', queue.discardsPurge, None]) + rows.append(['reroutes', queue.reroutes, None]) + disp.formattedTable("Statistics:", heads, rows) + def displaySubscriptions(self, subs): disp = Display(prefix=" ") @@ -359,7 +413,7 @@ class BrokerManager: heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] - subscriptions = self.brokers[0].getAllSubscriptions() + subscriptions = self.broker.getAllSubscriptions() sessions = self.getSessionMap() connections = self.getConnectionMap() for s in subscriptions: @@ -392,55 +446,55 @@ class BrokerManager: disp = Display(prefix=" ") heads = [Header('Statistic'), Header('Value', Header.COMMAS)] rows = [] - memory = self.brokers[0].getMemory() + memory = self.broker.getMemory() for k,v in memory.values.items(): if k != 'name': rows.append([k, v]) disp.formattedTable('Broker Memory Statistics:', heads, rows) def getExchangeMap(self): - exchanges = self.brokers[0].getAllExchanges() + exchanges = self.broker.getAllExchanges() emap = {} for e in exchanges: emap[e.name] = e return emap def getQueueMap(self): - queues = self.brokers[0].getAllQueues() + queues = self.broker.getAllQueues() qmap = {} for q in queues: qmap[q.name] = q return qmap def getSessionMap(self): - sessions = self.brokers[0].getAllSessions() + sessions = self.broker.getAllSessions() smap = {} for s in sessions: smap[s.name] = s return smap def getConnectionMap(self): - connections = self.brokers[0].getAllConnections() + connections = self.broker.getAllConnections() cmap = {} for c in connections: cmap[c.address] = c return cmap - def displayMain(self, main, subs): - if main == 'b': self.displayBroker(subs) + def displayMain(self, names, main, subs): + if main == 'g': self.displayBroker(subs) elif main == 'c': self.displayConn(subs) elif main == 's': self.displaySession(subs) elif main == 'e': self.displayExchange(subs) elif main == 'q': - if config._detail: - self.displayQueue(subs, config._detail) + if len(names) >= 1: + self.displayQueue(subs, names[0]) else: self.displayQueues(subs) elif main == 'u': self.displaySubscriptions(subs) elif main == 'm': self.displayMemory(subs) - def display(self): - self.displayMain(config._types[0], config._types[1:]) + def display(self, names): + self.displayMain(names, config._types[0], config._types[1:]) def main(argv=None): @@ -450,7 +504,7 @@ def main(argv=None): try: bm.SetBroker(config._host, config._sasl_mechanism) - bm.display() + bm.display(args) bm.Disconnect() return 0 except KeyboardInterrupt: diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py index 6a380caf8d..b3616f0b3a 100644 --- a/qpid/tools/src/py/qpidtoollibs/broker.py +++ b/qpid/tools/src/py/qpidtoollibs/broker.py @@ -24,6 +24,9 @@ except ImportError: from qpid.datatypes import uuid4 class BrokerAgent(object): + """ + Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection. + """ def __init__(self, conn): self.conn = conn self.sess = self.conn.session() @@ -35,6 +38,9 @@ class BrokerAgent(object): self.next_correlator = 1 def close(self): + """ + Close the proxy session. This will not affect the connection used in creating the object. + """ self.sess.close() def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): @@ -124,9 +130,15 @@ class BrokerAgent(object): return None def getCluster(self): + """ + Get the broker's Cluster object. + """ return self._getAllBrokerObjects(Cluster) def getBroker(self): + """ + Get the Broker object that contains broker-scope statistics and operations. + """ # # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because # of a bug that used to be in the broker whereby by-name queries did not return the @@ -173,8 +185,8 @@ class BrokerAgent(object): def getAllBindings(self): return self._getAllBrokerObjects(Binding) - def getBinding(self, exchange=None, queue=None): - pass + def getAllLinks(self): + return self._getAllBrokerObjects(Link) def echo(self, sequence, body): """Request a response to test the path to the management broker""" @@ -204,23 +216,52 @@ class BrokerAgent(object): """Get the message timestamping configuration""" pass -# def addExchange(self, exchange_type, name, **kwargs): -# pass - -# def delExchange(self, name): -# pass - -# def addQueue(self, name, **kwargs): -# pass - -# def delQueue(self, name): -# pass - -# def bind(self, exchange, queue, key, **kwargs): -# pass - -# def unbind(self, exchange, queue, key, **kwargs): -# pass + def addExchange(self, exchange_type, name, options={}, **kwargs): + properties = {} + properties['exchange-type'] = exchange_type + for k,v in options.items(): + properties[k] = v + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'exchange', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delExchange(self, name): + args = {'type': 'exchange', 'name': name} + self._method('delete', args) + + def addQueue(self, name, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'queue', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delQueue(self, name): + args = {'type': 'queue', 'name': name} + self._method('delete', args) + + def bind(self, exchange, queue, key, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'properties': properties, + 'strict': True} + self._method('create', args) + + def unbind(self, exchange, queue, key, **kwargs): + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'strict': True} + self._method('delete', args) def create(self, _type, name, properties, strict): """Create an object of the specified type""" @@ -328,3 +369,7 @@ class Queue(BrokerObject): self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, "org.apache.qpid.broker:queue:%s" % self.name) +class Link(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + diff --git a/qpid/tools/src/py/qpidtoollibs/disp.py b/qpid/tools/src/py/qpidtoollibs/disp.py index cb7d3da306..7962a13329 100644 --- a/qpid/tools/src/py/qpidtoollibs/disp.py +++ b/qpid/tools/src/py/qpidtoollibs/disp.py @@ -206,6 +206,11 @@ class Display: result += "%ds" % (sec % 60) return result + def YN(self, val): + if val: + return 'Y' + return 'N' + class Sortable: """ """ def __init__(self, row, sortIndex): |