summaryrefslogtreecommitdiff
path: root/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tests/src/py/qpid_tests/broker_0_10/management.py')
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/management.py587
1 files changed, 587 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
new file mode 100644
index 0000000000..952878e0b7
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
@@ -0,0 +1,587 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+from qpid.management import managementChannel, managementClient
+from threading import Condition
+from time import sleep
+import qmf.console
+import qpid.messaging
+
+class ManagementTest (TestBase010):
+ """
+ Tests for the management hooks
+ """
+
+ def test_broker_connectivity_oldAPI (self):
+ """
+ Call the "echo" method on the broker to verify it is alive and talking.
+ """
+ session = self.session
+
+ mc = managementClient ()
+ mch = mc.addChannel (session)
+
+ mc.syncWaitForStable (mch)
+ brokers = mc.syncGetObjects (mch, "broker")
+ self.assertEqual (len (brokers), 1)
+ broker = brokers[0]
+ args = {}
+ body = "Echo Message Body"
+ args["body"] = body
+
+ for seq in range (1, 5):
+ args["sequence"] = seq
+ res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args)
+ self.assertEqual (res.status, 0)
+ self.assertEqual (res.statusText, "OK")
+ self.assertEqual (res.sequence, seq)
+ self.assertEqual (res.body, body)
+ mc.removeChannel (mch)
+
+ def test_methods_sync (self):
+ """
+ Call the "echo" method on the broker to verify it is alive and talking.
+ """
+ session = self.session
+ self.startQmf()
+
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+
+ body = "Echo Message Body"
+ for seq in range(1, 20):
+ res = broker.echo(seq, body)
+ self.assertEqual(res.status, 0)
+ self.assertEqual(res.text, "OK")
+ self.assertEqual(res.sequence, seq)
+ self.assertEqual(res.body, body)
+
+ def test_get_objects(self):
+ self.startQmf()
+
+ # get the package list, verify that the qpid broker package is there
+ packages = self.qmf.getPackages()
+ assert 'org.apache.qpid.broker' in packages
+
+ # get the schema class keys for the broker, verify the broker table and link-down event
+ keys = self.qmf.getClasses('org.apache.qpid.broker')
+ broker = None
+ linkDown = None
+ for key in keys:
+ if key.getClassName() == "broker": broker = key
+ if key.getClassName() == "brokerLinkDown" : linkDown = key
+ assert broker
+ assert linkDown
+
+ brokerObjs = self.qmf.getObjects(_class="broker")
+ assert len(brokerObjs) == 1
+ brokerObjs = self.qmf.getObjects(_key=broker)
+ assert len(brokerObjs) == 1
+
+ def test_self_session_id (self):
+ self.startQmf()
+ sessionId = self.qmf_broker.getSessionId()
+ brokerSessions = self.qmf.getObjects(_class="session")
+
+ found = False
+ for bs in brokerSessions:
+ if bs.name == sessionId:
+ found = True
+ self.assertEqual (found, True)
+
+ def test_standard_exchanges (self):
+ self.startQmf()
+
+ exchanges = self.qmf.getObjects(_class="exchange")
+ exchange = self.findExchange (exchanges, "")
+ self.assertEqual (exchange.type, "direct")
+ exchange = self.findExchange (exchanges, "amq.direct")
+ self.assertEqual (exchange.type, "direct")
+ exchange = self.findExchange (exchanges, "amq.topic")
+ self.assertEqual (exchange.type, "topic")
+ exchange = self.findExchange (exchanges, "amq.fanout")
+ self.assertEqual (exchange.type, "fanout")
+ exchange = self.findExchange (exchanges, "amq.match")
+ self.assertEqual (exchange.type, "headers")
+ exchange = self.findExchange (exchanges, "qpid.management")
+ self.assertEqual (exchange.type, "topic")
+
+ def findExchange (self, exchanges, name):
+ for exchange in exchanges:
+ if exchange.name == name:
+ return exchange
+ return None
+
+ def test_move_queued_messages(self):
+ """
+ Test ability to move messages from the head of one queue to another.
+ Need to test moveing all and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up source queue"
+ session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Move Message %d" % count
+ src_msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=src_msg)
+
+ "Set up destination queue"
+ session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="dest-queue", exchange="amq.direct")
+
+ queues = self.qmf.getObjects(_class="queue")
+
+ "Move 10 messages from src-queue to dest-queue"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ self.assertEqual (result.status, 0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,10)
+ self.assertEqual (dq.msgDepth,10)
+
+ "Move all remaining messages to destination"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ self.assertEqual (result.status,0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,0)
+ self.assertEqual (dq.msgDepth,20)
+
+ "Use a bad source queue name"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ self.assertEqual (result.status,4)
+
+ "Use a bad destination queue name"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ self.assertEqual (result.status,4)
+
+ " Use a large qty (40) to move from dest-queue back to "
+ " src-queue- should move all "
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ self.assertEqual (result.status,0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,20)
+ self.assertEqual (dq.msgDepth,0)
+
+ "Consume the messages of the queue and check they are all there in order"
+ session.message_subscribe(queue="src-queue", destination="tag")
+ session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("tag")
+ for count in twenty:
+ consumed_msg = queue.get(timeout=1)
+ body = "Move Message %d" % count
+ self.assertEqual(body, consumed_msg.body)
+
+ def test_purge_queue(self):
+ """
+ Test ability to purge messages from the head of a queue.
+ Need to test moveing all, 1 (top message) and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up purge queue"
+ session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Purge Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+
+ "Purge top message from purge-queue"
+ result = pq.purge(1)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,19)
+
+ "Purge top 9 messages from purge-queue"
+ result = pq.purge(9)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,10)
+
+ "Purge all messages from purge-queue"
+ result = pq.purge(0)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,0)
+
+ def test_reroute_priority_queue(self):
+ self.startQmf()
+ session = self.session
+
+ #setup test queue supporting multiple priority levels
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10})
+
+ #send some messages of varying priority to that queue:
+ for i in range(0, 5):
+ deliveryProps = session.delivery_properties(routing_key="test-queue", priority=i+5)
+ session.message_transfer(message=Message(deliveryProps, "Message %d" % (i+1)))
+
+
+ #declare and bind a queue to amq.fanout through which rerouted
+ #messages can be verified:
+ session.queue_declare(queue="rerouted", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10})
+ session.exchange_bind(queue="rerouted", exchange="amq.fanout")
+
+ #reroute messages from test queue to amq.fanout (and hence to
+ #rerouted queue):
+ pq = self.qmf.getObjects(_class="queue", name="test-queue")[0]
+ result = pq.reroute(0, False, "amq.fanout")
+ self.assertEqual(result.status, 0)
+
+ #verify messages are all rerouted:
+ self.subscribe(destination="incoming", queue="rerouted")
+ incoming = session.incoming("incoming")
+ for i in range(0, 5):
+ msg = incoming.get(timeout=1)
+ self.assertEqual("Message %d" % (5-i), msg.body)
+
+
+ def test_reroute_queue(self):
+ """
+ Test ability to reroute messages from the head of a queue.
+ Need to test moving all, 1 (top message) and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up test queue"
+ session.exchange_declare(exchange="alt.direct1", type="direct")
+ session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
+ session.exchange_declare(exchange="alt.direct2", type="direct")
+ session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
+ session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
+ session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Reroute Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
+
+ "Reroute top message from reroute-queue to alternate exchange"
+ result = pq.reroute(1, True, "")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
+ self.assertEqual(pq.msgDepth,19)
+ self.assertEqual(aq.msgDepth,1)
+
+ "Reroute top 9 messages from reroute-queue to alt.direct2"
+ result = pq.reroute(9, False, "alt.direct2")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+ self.assertEqual(pq.msgDepth,10)
+ self.assertEqual(aq.msgDepth,9)
+
+ "Reroute using a non-existent exchange"
+ result = pq.reroute(0, False, "amq.nosuchexchange")
+ self.assertEqual(result.status, 4)
+
+ "Reroute all messages from reroute-queue"
+ result = pq.reroute(0, False, "alt.direct2")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+ self.assertEqual(pq.msgDepth,0)
+ self.assertEqual(aq.msgDepth,19)
+
+ "Make more messages"
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Reroute Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ "Reroute onto the same queue"
+ result = pq.reroute(0, False, "amq.direct")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ self.assertEqual(pq.msgDepth,20)
+
+ def test_reroute_alternate_exchange(self):
+ """
+ Test that when rerouting, the alternate-exchange is considered if relevant
+ """
+ self.startQmf()
+ session = self.session
+ # 1. Create 2 exchanges A and B (fanout) where B is the
+ # alternate exchange for A
+ session.exchange_declare(exchange="B", type="fanout")
+ session.exchange_declare(exchange="A", type="fanout", alternate_exchange="B")
+
+ # 2. Bind queue X to B
+ session.queue_declare(queue="X", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="X", exchange="B")
+
+ # 3. Send 1 message to queue Y
+ session.queue_declare(queue="Y", exclusive=True, auto_delete=True)
+ props = session.delivery_properties(routing_key="Y")
+ session.message_transfer(message=Message(props, "reroute me!"))
+
+ # 4. Call reroute on queue Y and specify that messages should
+ # be sent to exchange A
+ y = self.qmf.getObjects(_class="queue", name="Y")[0]
+ result = y.reroute(1, False, "A")
+ self.assertEqual(result.status, 0)
+
+ # 5. verify that the message is rerouted through B (as A has
+ # no matching bindings) to X
+ self.subscribe(destination="x", queue="X")
+ self.assertEqual("reroute me!", session.incoming("x").get(timeout=1).body)
+
+ # Cleanup
+ for e in ["A", "B"]: session.exchange_delete(exchange=e)
+
+ def test_methods_async (self):
+ """
+ """
+ class Handler (qmf.console.Console):
+ def __init__(self):
+ self.cv = Condition()
+ self.xmtList = {}
+ self.rcvList = {}
+
+ def methodResponse(self, broker, seq, response):
+ self.cv.acquire()
+ try:
+ self.rcvList[seq] = response
+ finally:
+ self.cv.release()
+
+ def request(self, broker, count):
+ self.count = count
+ for idx in range(count):
+ self.cv.acquire()
+ try:
+ seq = broker.echo(idx, "Echo Message", _async = True)
+ self.xmtList[seq] = idx
+ finally:
+ self.cv.release()
+
+ def check(self):
+ if self.count != len(self.xmtList):
+ return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
+ lost = 0
+ mismatched = 0
+ for seq in self.xmtList:
+ value = self.xmtList[seq]
+ if seq in self.rcvList:
+ result = self.rcvList.pop(seq)
+ if result.sequence != value:
+ mismatched += 1
+ else:
+ lost += 1
+ spurious = len(self.rcvList)
+ if lost == 0 and mismatched == 0 and spurious == 0:
+ return "pass"
+ else:
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
+
+ handler = Handler()
+ self.startQmf(handler)
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+ handler.request(broker, 20)
+ sleep(1)
+ self.assertEqual(handler.check(), "pass")
+
+ def test_connection_close(self):
+ """
+ Test management method for closing connection
+ """
+ self.startQmf()
+ conn = self.connect()
+ session = conn.session("my-named-session")
+
+ #using qmf find named session and close the corresponding connection:
+ qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0]
+ qmf_ssn_object._connectionRef_.close()
+
+ #check that connection is closed
+ try:
+ conn.session("another-session")
+ self.fail("Expected failure from closed connection")
+ except: None
+
+ #make sure that the named session has been closed and the name can be re-used
+ conn = self.connect()
+ session = conn.session("my-named-session")
+ session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
+
+ def test_immediate_method(self):
+ url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port)
+ conn = qpid.messaging.Connection(url)
+ conn.open()
+ sess = conn.session()
+ replyTo = "qmf.default.direct/reply_immediate_method_test;{node:{type:topic}}"
+ agent_sender = sess.sender("qmf.default.direct/broker")
+ agent_receiver = sess.receiver(replyTo)
+ queue_create = sess.sender("test-queue-imm-method;{create:always,delete:always,node:{type:queue,durable:False,x-declare:{auto-delete:True}}}")
+
+ method_request = {'_method_name':'reroute','_object_id':{'_object_name':'org.apache.qpid.broker:queue:test-queue-imm-method'}}
+ method_request['_arguments'] = {'request':0, 'useAltExchange':False, 'exchange':'amq.fanout'}
+
+ reroute_call = qpid.messaging.Message(method_request)
+ reroute_call.properties['qmf.opcode'] = '_method_request'
+ reroute_call.properties['x-amqp-0-10.app-id'] = 'qmf2'
+ reroute_call.reply_to = replyTo
+
+ agent_sender.send(reroute_call)
+ result = agent_receiver.fetch(3)
+ self.assertEqual(result.properties['qmf.opcode'], '_method_response')
+
+ conn.close()
+
+ def test_binding_count_on_queue(self):
+ self.startQmf()
+ conn = self.connect()
+ session = self.session
+
+ QUEUE = "binding_test_queue"
+ EX_DIR = "binding_test_exchange_direct"
+ EX_FAN = "binding_test_exchange_fanout"
+ EX_TOPIC = "binding_test_exchange_topic"
+ EX_HDR = "binding_test_exchange_headers"
+
+ #
+ # Create a test queue
+ #
+ session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True)
+ queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0]
+ if not queue:
+ self.fail("Queue not found")
+ self.assertEqual(queue.bindingCount, 1, "wrong initial binding count")
+
+ #
+ # Create an exchange of each supported type
+ #
+ session.exchange_declare(exchange=EX_DIR, type="direct")
+ session.exchange_declare(exchange=EX_FAN, type="fanout")
+ session.exchange_declare(exchange=EX_TOPIC, type="topic")
+ session.exchange_declare(exchange=EX_HDR, type="headers")
+
+ #
+ # Bind each exchange to the test queue
+ #
+ match = {}
+ match['x-match'] = "all"
+ match['key'] = "value"
+ session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1")
+ session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+ session.exchange_bind(exchange=EX_FAN, queue=QUEUE)
+ session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#")
+ session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+ session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match)
+ match['key2'] = "value2"
+ session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match)
+
+ #
+ # Verify that the queue's binding count accounts for the new bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 8,
+ "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount)
+
+ #
+ # Remove some of the bindings
+ #
+ session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+ session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+ session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2")
+
+ #
+ # Verify that the queue's binding count accounts for the deleted bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 5,
+ "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount)
+ #
+ # Delete the exchanges
+ #
+ session.exchange_delete(exchange=EX_DIR)
+ session.exchange_delete(exchange=EX_FAN)
+ session.exchange_delete(exchange=EX_TOPIC)
+ session.exchange_delete(exchange=EX_HDR)
+
+ #
+ # Verify that the queue's binding count accounts for the lost bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 1,
+ "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
+ def test_connection_stats(self):
+ """
+ Test message in/out stats for connection
+ """
+ self.startQmf()
+ conn = self.connect()
+ session = conn.session("stats-session")
+
+ #using qmf find named session and the corresponding connection:
+ conn_qmf = self.qmf.getObjects(_class="session", name="stats-session")[0]._connectionRef_
+
+ #send a message to a queue
+ session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="stats-q"), "abc"))
+
+ #check the 'msgs sent from' stat for this connection
+ conn_qmf.update()
+ self.assertEqual(conn_qmf.msgsFromClient, 1)
+
+ #receive message from queue
+ session.message_subscribe(destination="d", queue="stats-q")
+ incoming = session.incoming("d")
+ incoming.start()
+ self.assertEqual("abc", incoming.get(timeout=1).body)
+
+ #check the 'msgs sent to' stat for this connection
+ conn_qmf.update()
+ self.assertEqual(conn_qmf.msgsToClient, 1)
+
+