# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from qpid.datatypes import Message, RangedSet from qpid.testlib import TestBase010 from qpid.management import managementChannel, managementClient from threading import Condition from time import sleep import qmf.console import qpid.messaging class ManagementTest (TestBase010): """ Tests for the management hooks """ def test_broker_connectivity_oldAPI (self): """ Call the "echo" method on the broker to verify it is alive and talking. """ session = self.session mc = managementClient () mch = mc.addChannel (session) mc.syncWaitForStable (mch) brokers = mc.syncGetObjects (mch, "broker") self.assertEqual (len (brokers), 1) broker = brokers[0] args = {} body = "Echo Message Body" args["body"] = body for seq in range (1, 5): args["sequence"] = seq res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args) self.assertEqual (res.status, 0) self.assertEqual (res.statusText, "OK") self.assertEqual (res.sequence, seq) self.assertEqual (res.body, body) mc.removeChannel (mch) def test_methods_sync (self): """ Call the "echo" method on the broker to verify it is alive and talking. """ session = self.session self.startQmf() brokers = self.qmf.getObjects(_class="broker") self.assertEqual(len(brokers), 1) broker = brokers[0] body = "Echo Message Body" for seq in range(1, 20): res = broker.echo(seq, body) self.assertEqual(res.status, 0) self.assertEqual(res.text, "OK") self.assertEqual(res.sequence, seq) self.assertEqual(res.body, body) def test_get_objects(self): self.startQmf() # get the package list, verify that the qpid broker package is there packages = self.qmf.getPackages() assert 'org.apache.qpid.broker' in packages # get the schema class keys for the broker, verify the broker table and link-down event keys = self.qmf.getClasses('org.apache.qpid.broker') broker = None linkDown = None for key in keys: if key.getClassName() == "broker": broker = key if key.getClassName() == "brokerLinkDown" : linkDown = key assert broker assert linkDown brokerObjs = self.qmf.getObjects(_class="broker") assert len(brokerObjs) == 1 brokerObjs = self.qmf.getObjects(_key=broker) assert len(brokerObjs) == 1 def test_self_session_id (self): self.startQmf() sessionId = self.qmf_broker.getSessionId() brokerSessions = self.qmf.getObjects(_class="session") found = False for bs in brokerSessions: if bs.name == sessionId: found = True self.assertEqual (found, True) def test_standard_exchanges (self): self.startQmf() exchanges = self.qmf.getObjects(_class="exchange") exchange = self.findExchange (exchanges, "") self.assertEqual (exchange.type, "direct") exchange = self.findExchange (exchanges, "amq.direct") self.assertEqual (exchange.type, "direct") exchange = self.findExchange (exchanges, "amq.topic") self.assertEqual (exchange.type, "topic") exchange = self.findExchange (exchanges, "amq.fanout") self.assertEqual (exchange.type, "fanout") exchange = self.findExchange (exchanges, "amq.match") self.assertEqual (exchange.type, "headers") exchange = self.findExchange (exchanges, "qpid.management") self.assertEqual (exchange.type, "topic") def findExchange (self, exchanges, name): for exchange in exchanges: if exchange.name == name: return exchange return None def test_move_queued_messages(self): """ Test ability to move messages from the head of one queue to another. Need to test moveing all and N messages. """ self.startQmf() session = self.session "Set up source queue" session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True) session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key") twenty = range(1,21) props = session.delivery_properties(routing_key="routing_key") for count in twenty: body = "Move Message %d" % count src_msg = Message(props, body) session.message_transfer(destination="amq.direct", message=src_msg) "Set up destination queue" session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True) session.exchange_bind(queue="dest-queue", exchange="amq.direct") queues = self.qmf.getObjects(_class="queue") "Move 10 messages from src-queue to dest-queue" result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) self.assertEqual (result.status, 0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,10) self.assertEqual (dq.msgDepth,10) "Move all remaining messages to destination" result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) self.assertEqual (result.status,0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,0) self.assertEqual (dq.msgDepth,20) "Use a bad source queue name" result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) self.assertEqual (result.status,4) "Use a bad destination queue name" result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) self.assertEqual (result.status,4) " Use a large qty (40) to move from dest-queue back to " " src-queue- should move all " result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) self.assertEqual (result.status,0) sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,20) self.assertEqual (dq.msgDepth,0) "Consume the messages of the queue and check they are all there in order" session.message_subscribe(queue="src-queue", destination="tag") session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) queue = session.incoming("tag") for count in twenty: consumed_msg = queue.get(timeout=1) body = "Move Message %d" % count self.assertEqual(body, consumed_msg.body) def test_purge_queue(self): """ Test ability to purge messages from the head of a queue. Need to test moveing all, 1 (top message) and N messages. """ self.startQmf() session = self.session "Set up purge queue" session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True) session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key") twenty = range(1,21) props = session.delivery_properties(routing_key="routing_key") for count in twenty: body = "Purge Message %d" % count msg = Message(props, body) session.message_transfer(destination="amq.direct", message=msg) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] "Purge top message from purge-queue" result = pq.purge(1) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,19) "Purge top 9 messages from purge-queue" result = pq.purge(9) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,10) "Purge all messages from purge-queue" result = pq.purge(0) self.assertEqual (result.status, 0) pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) def test_reroute_priority_queue(self): self.startQmf() session = self.session #setup test queue supporting multiple priority levels session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10}) #send some messages of varying priority to that queue: for i in range(0, 5): deliveryProps = session.delivery_properties(routing_key="test-queue", priority=i+5) session.message_transfer(message=Message(deliveryProps, "Message %d" % (i+1))) #declare and bind a queue to amq.fanout through which rerouted #messages can be verified: session.queue_declare(queue="rerouted", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10}) session.exchange_bind(queue="rerouted", exchange="amq.fanout") #reroute messages from test queue to amq.fanout (and hence to #rerouted queue): pq = self.qmf.getObjects(_class="queue", name="test-queue")[0] result = pq.reroute(0, False, "amq.fanout") self.assertEqual(result.status, 0) #verify messages are all rerouted: self.subscribe(destination="incoming", queue="rerouted") incoming = session.incoming("incoming") for i in range(0, 5): msg = incoming.get(timeout=1) self.assertEqual("Message %d" % (5-i), msg.body) def test_reroute_queue(self): """ Test ability to reroute messages from the head of a queue. Need to test moving all, 1 (top message) and N messages. """ self.startQmf() session = self.session "Set up test queue" session.exchange_declare(exchange="alt.direct1", type="direct") session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True) session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key") session.exchange_declare(exchange="alt.direct2", type="direct") session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True) session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key") session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1") session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key") twenty = range(1,21) props = session.delivery_properties(routing_key="routing_key") for count in twenty: body = "Reroute Message %d" % count msg = Message(props, body) session.message_transfer(destination="amq.direct", message=msg) pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0] "Reroute top message from reroute-queue to alternate exchange" result = pq.reroute(1, True, "") self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0] self.assertEqual(pq.msgDepth,19) self.assertEqual(aq.msgDepth,1) "Reroute top 9 messages from reroute-queue to alt.direct2" result = pq.reroute(9, False, "alt.direct2") self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] self.assertEqual(pq.msgDepth,10) self.assertEqual(aq.msgDepth,9) "Reroute using a non-existent exchange" result = pq.reroute(0, False, "amq.nosuchexchange") self.assertEqual(result.status, 4) "Reroute all messages from reroute-queue" result = pq.reroute(0, False, "alt.direct2") self.assertEqual(result.status, 0) pq.update() aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] self.assertEqual(pq.msgDepth,0) self.assertEqual(aq.msgDepth,19) "Make more messages" twenty = range(1,21) props = session.delivery_properties(routing_key="routing_key") for count in twenty: body = "Reroute Message %d" % count msg = Message(props, body) session.message_transfer(destination="amq.direct", message=msg) "Reroute onto the same queue" result = pq.reroute(0, False, "amq.direct") self.assertEqual(result.status, 0) pq.update() self.assertEqual(pq.msgDepth,20) def test_reroute_alternate_exchange(self): """ Test that when rerouting, the alternate-exchange is considered if relevant """ self.startQmf() session = self.session # 1. Create 2 exchanges A and B (fanout) where B is the # alternate exchange for A session.exchange_declare(exchange="B", type="fanout") session.exchange_declare(exchange="A", type="fanout", alternate_exchange="B") # 2. Bind queue X to B session.queue_declare(queue="X", exclusive=True, auto_delete=True) session.exchange_bind(queue="X", exchange="B") # 3. Send 1 message to queue Y session.queue_declare(queue="Y", exclusive=True, auto_delete=True) props = session.delivery_properties(routing_key="Y") session.message_transfer(message=Message(props, "reroute me!")) # 4. Call reroute on queue Y and specify that messages should # be sent to exchange A y = self.qmf.getObjects(_class="queue", name="Y")[0] result = y.reroute(1, False, "A") self.assertEqual(result.status, 0) # 5. verify that the message is rerouted through B (as A has # no matching bindings) to X self.subscribe(destination="x", queue="X") self.assertEqual("reroute me!", session.incoming("x").get(timeout=1).body) # Cleanup for e in ["A", "B"]: session.exchange_delete(exchange=e) def test_methods_async (self): """ """ class Handler (qmf.console.Console): def __init__(self): self.cv = Condition() self.xmtList = {} self.rcvList = {} def methodResponse(self, broker, seq, response): self.cv.acquire() try: self.rcvList[seq] = response finally: self.cv.release() def request(self, broker, count): self.count = count for idx in range(count): self.cv.acquire() try: seq = broker.echo(idx, "Echo Message", _async = True) self.xmtList[seq] = idx finally: self.cv.release() def check(self): if self.count != len(self.xmtList): return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList)) lost = 0 mismatched = 0 for seq in self.xmtList: value = self.xmtList[seq] if seq in self.rcvList: result = self.rcvList.pop(seq) if result.sequence != value: mismatched += 1 else: lost += 1 spurious = len(self.rcvList) if lost == 0 and mismatched == 0 and spurious == 0: return "pass" else: return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious) handler = Handler() self.startQmf(handler) brokers = self.qmf.getObjects(_class="broker") self.assertEqual(len(brokers), 1) broker = brokers[0] handler.request(broker, 20) sleep(1) self.assertEqual(handler.check(), "pass") def test_connection_close(self): """ Test management method for closing connection """ self.startQmf() conn = self.connect() session = conn.session("my-named-session") #using qmf find named session and close the corresponding connection: qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0] qmf_ssn_object._connectionRef_.close() #check that connection is closed try: conn.session("another-session") self.fail("Expected failure from closed connection") except: None #make sure that the named session has been closed and the name can be re-used conn = self.connect() session = conn.session("my-named-session") session.queue_declare(queue="whatever", exclusive=True, auto_delete=True) def test_immediate_method(self): url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port) conn = qpid.messaging.Connection(url) conn.open() sess = conn.session() replyTo = "qmf.default.direct/reply_immediate_method_test;{node:{type:topic}}" agent_sender = sess.sender("qmf.default.direct/broker") agent_receiver = sess.receiver(replyTo) queue_create = sess.sender("test-queue-imm-method;{create:always,delete:always,node:{type:queue,durable:False,x-declare:{auto-delete:True}}}") method_request = {'_method_name':'reroute','_object_id':{'_object_name':'org.apache.qpid.broker:queue:test-queue-imm-method'}} method_request['_arguments'] = {'request':0, 'useAltExchange':False, 'exchange':'amq.fanout'} reroute_call = qpid.messaging.Message(method_request) reroute_call.properties['qmf.opcode'] = '_method_request' reroute_call.properties['x-amqp-0-10.app-id'] = 'qmf2' reroute_call.reply_to = replyTo agent_sender.send(reroute_call) result = agent_receiver.fetch(3) self.assertEqual(result.properties['qmf.opcode'], '_method_response') conn.close() def test_binding_count_on_queue(self): self.startQmf() conn = self.connect() session = self.session QUEUE = "binding_test_queue" EX_DIR = "binding_test_exchange_direct" EX_FAN = "binding_test_exchange_fanout" EX_TOPIC = "binding_test_exchange_topic" EX_HDR = "binding_test_exchange_headers" # # Create a test queue # session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True) queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0] if not queue: self.fail("Queue not found") self.assertEqual(queue.bindingCount, 1, "wrong initial binding count") # # Create an exchange of each supported type # session.exchange_declare(exchange=EX_DIR, type="direct") session.exchange_declare(exchange=EX_FAN, type="fanout") session.exchange_declare(exchange=EX_TOPIC, type="topic") session.exchange_declare(exchange=EX_HDR, type="headers") # # Bind each exchange to the test queue # match = {} match['x-match'] = "all" match['key'] = "value" session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1") session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2") session.exchange_bind(exchange=EX_FAN, queue=QUEUE) session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#") session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#") session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match) match['key2'] = "value2" session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match) # # Verify that the queue's binding count accounts for the new bindings # queue.update() self.assertEqual(queue.bindingCount, 8, "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount) # # Remove some of the bindings # session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2") session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#") session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2") # # Verify that the queue's binding count accounts for the deleted bindings # queue.update() self.assertEqual(queue.bindingCount, 5, "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount) # # Delete the exchanges # session.exchange_delete(exchange=EX_DIR) session.exchange_delete(exchange=EX_FAN) session.exchange_delete(exchange=EX_TOPIC) session.exchange_delete(exchange=EX_HDR) # # Verify that the queue's binding count accounts for the lost bindings # queue.update() self.assertEqual(queue.bindingCount, 1, "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount) def test_connection_stats(self): """ Test message in/out stats for connection """ self.startQmf() conn = self.connect() session = conn.session("stats-session") #using qmf find named session and the corresponding connection: conn_qmf = self.qmf.getObjects(_class="session", name="stats-session")[0]._connectionRef_ #send a message to a queue session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True) session.message_transfer(message=Message(session.delivery_properties(routing_key="stats-q"), "abc")) #check the 'msgs sent from' stat for this connection conn_qmf.update() self.assertEqual(conn_qmf.msgsFromClient, 1) #receive message from queue session.message_subscribe(destination="d", queue="stats-q") incoming = session.incoming("d") incoming.start() self.assertEqual("abc", incoming.get(timeout=1).body) #check the 'msgs sent to' stat for this connection conn_qmf.update() self.assertEqual(conn_qmf.msgsToClient, 1)