summaryrefslogtreecommitdiff
path: root/python/tests_0-10/management.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests_0-10/management.py')
-rw-r--r--python/tests_0-10/management.py339
1 files changed, 315 insertions, 24 deletions
diff --git a/python/tests_0-10/management.py b/python/tests_0-10/management.py
index f1360a1902..9dd03bbda4 100644
--- a/python/tests_0-10/management.py
+++ b/python/tests_0-10/management.py
@@ -20,19 +20,22 @@
from qpid.datatypes import Message, RangedSet
from qpid.testlib import TestBase010
from qpid.management import managementChannel, managementClient
+from threading import Condition
+from time import sleep
+import qmf.console
class ManagementTest (TestBase010):
"""
Tests for the management hooks
"""
- def test_broker_connectivity (self):
+ def test_broker_connectivity_oldAPI (self):
"""
Call the "echo" method on the broker to verify it is alive and talking.
"""
session = self.session
- mc = managementClient (session.spec)
+ mc = managementClient ()
mch = mc.addChannel (session)
mc.syncWaitForStable (mch)
@@ -52,40 +55,62 @@ class ManagementTest (TestBase010):
self.assertEqual (res.body, body)
mc.removeChannel (mch)
- def test_system_object (self):
+ def test_methods_sync (self):
+ """
+ Call the "echo" method on the broker to verify it is alive and talking.
+ """
session = self.session
+ self.startQmf()
- mc = managementClient (session.spec)
- mch = mc.addChannel (session)
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
- mc.syncWaitForStable (mch)
- systems = mc.syncGetObjects (mch, "system")
- self.assertEqual (len (systems), 1)
- mc.removeChannel (mch)
+ body = "Echo Message Body"
+ for seq in range(1, 20):
+ res = broker.echo(seq, body)
+ self.assertEqual(res.status, 0)
+ self.assertEqual(res.text, "OK")
+ self.assertEqual(res.sequence, seq)
+ self.assertEqual(res.body, body)
+
+ def test_get_objects(self):
+ self.startQmf()
+
+ # get the package list, verify that the qpid broker package is there
+ packages = self.qmf.getPackages()
+ assert 'org.apache.qpid.broker' in packages
+
+ # get the schema class keys for the broker, verify the broker table and link-down event
+ keys = self.qmf.getClasses('org.apache.qpid.broker')
+ broker = None
+ linkDown = None
+ for key in keys:
+ if key.getClassName() == "broker": broker = key
+ if key.getClassName() == "brokerLinkDown" : linkDown = key
+ assert broker
+ assert linkDown
+
+ brokerObjs = self.qmf.getObjects(_class="broker")
+ assert len(brokerObjs) == 1
+ brokerObjs = self.qmf.getObjects(_key=broker)
+ assert len(brokerObjs) == 1
def test_self_session_id (self):
- session = self.session
-
- mc = managementClient (session.spec)
- mch = mc.addChannel (session)
+ self.startQmf()
+ sessionId = self.qmf_broker.getSessionId()
+ brokerSessions = self.qmf.getObjects(_class="session")
- info = mc.syncWaitForStable (mch)
- brokerSessions = mc.syncGetObjects (mch, "session")
found = False
for bs in brokerSessions:
- if bs.name == info.sessionId:
+ if bs.name == sessionId:
found = True
self.assertEqual (found, True)
- mc.removeChannel (mch)
def test_standard_exchanges (self):
- session = self.session
-
- mc = managementClient (session.spec)
- mch = mc.addChannel (session)
+ self.startQmf()
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
+ exchanges = self.qmf.getObjects(_class="exchange")
exchange = self.findExchange (exchanges, "")
self.assertEqual (exchange.type, "direct")
exchange = self.findExchange (exchanges, "amq.direct")
@@ -98,10 +123,276 @@ class ManagementTest (TestBase010):
self.assertEqual (exchange.type, "headers")
exchange = self.findExchange (exchanges, "qpid.management")
self.assertEqual (exchange.type, "topic")
- mc.removeChannel (mch)
def findExchange (self, exchanges, name):
for exchange in exchanges:
if exchange.name == name:
return exchange
return None
+
+ def test_move_queued_messages(self):
+ """
+ Test ability to move messages from the head of one queue to another.
+ Need to test moveing all and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up source queue"
+ session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Move Message %d" % count
+ src_msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=src_msg)
+
+ "Set up destination queue"
+ session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="dest-queue", exchange="amq.direct")
+
+ queues = self.qmf.getObjects(_class="queue")
+
+ "Move 10 messages from src-queue to dest-queue"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ self.assertEqual (result.status, 0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,10)
+ self.assertEqual (dq.msgDepth,10)
+
+ "Move all remaining messages to destination"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ self.assertEqual (result.status,0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,0)
+ self.assertEqual (dq.msgDepth,20)
+
+ "Use a bad source queue name"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ self.assertEqual (result.status,4)
+
+ "Use a bad destination queue name"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ self.assertEqual (result.status,4)
+
+ " Use a large qty (40) to move from dest-queue back to "
+ " src-queue- should move all "
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ self.assertEqual (result.status,0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,20)
+ self.assertEqual (dq.msgDepth,0)
+
+ "Consume the messages of the queue and check they are all there in order"
+ session.message_subscribe(queue="src-queue", destination="tag")
+ session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("tag")
+ for count in twenty:
+ consumed_msg = queue.get(timeout=1)
+ body = "Move Message %d" % count
+ self.assertEqual(body, consumed_msg.body)
+
+ def test_purge_queue(self):
+ """
+ Test ability to purge messages from the head of a queue.
+ Need to test moveing all, 1 (top message) and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up purge queue"
+ session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Purge Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+
+ "Purge top message from purge-queue"
+ result = pq.purge(1)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,19)
+
+ "Purge top 9 messages from purge-queue"
+ result = pq.purge(9)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,10)
+
+ "Purge all messages from purge-queue"
+ result = pq.purge(0)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,0)
+
+ def test_methods_async (self):
+ """
+ """
+ class Handler (qmf.console.Console):
+ def __init__(self):
+ self.cv = Condition()
+ self.xmtList = {}
+ self.rcvList = {}
+
+ def methodResponse(self, broker, seq, response):
+ self.cv.acquire()
+ try:
+ self.rcvList[seq] = response
+ finally:
+ self.cv.release()
+
+ def request(self, broker, count):
+ self.count = count
+ for idx in range(count):
+ self.cv.acquire()
+ try:
+ seq = broker.echo(idx, "Echo Message", _async = True)
+ self.xmtList[seq] = idx
+ finally:
+ self.cv.release()
+
+ def check(self):
+ if self.count != len(self.xmtList):
+ return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
+ lost = 0
+ mismatched = 0
+ for seq in self.xmtList:
+ value = self.xmtList[seq]
+ if seq in self.rcvList:
+ result = self.rcvList.pop(seq)
+ if result.sequence != value:
+ mismatched += 1
+ else:
+ lost += 1
+ spurious = len(self.rcvList)
+ if lost == 0 and mismatched == 0 and spurious == 0:
+ return "pass"
+ else:
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
+
+ handler = Handler()
+ self.startQmf(handler)
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+ handler.request(broker, 20)
+ sleep(1)
+ self.assertEqual(handler.check(), "pass")
+
+ def test_connection_close(self):
+ """
+ Test management method for closing connection
+ """
+ self.startQmf()
+ conn = self.connect()
+ session = conn.session("my-named-session")
+
+ #using qmf find named session and close the corresponding connection:
+ qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0]
+ qmf_ssn_object._connectionRef_.close()
+
+ #check that connection is closed
+ try:
+ conn.session("another-session")
+ self.fail("Expected failure from closed connection")
+ except: None
+
+ #make sure that the named session has been closed and the name can be re-used
+ conn = self.connect()
+ session = conn.session("my-named-session")
+ session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
+
+ def test_binding_count_on_queue(self):
+ self.startQmf()
+ conn = self.connect()
+ session = self.session
+
+ QUEUE = "binding_test_queue"
+ EX_DIR = "binding_test_exchange_direct"
+ EX_FAN = "binding_test_exchange_fanout"
+ EX_TOPIC = "binding_test_exchange_topic"
+ EX_HDR = "binding_test_exchange_headers"
+
+ #
+ # Create a test queue
+ #
+ session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True)
+ queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0]
+ if not queue:
+ self.fail("Queue not found")
+ self.assertEqual(queue.bindingCount, 1, "wrong initial binding count")
+
+ #
+ # Create an exchange of each supported type
+ #
+ session.exchange_declare(exchange=EX_DIR, type="direct")
+ session.exchange_declare(exchange=EX_FAN, type="fanout")
+ session.exchange_declare(exchange=EX_TOPIC, type="topic")
+ session.exchange_declare(exchange=EX_HDR, type="headers")
+
+ #
+ # Bind each exchange to the test queue
+ #
+ match = {}
+ match['x-match'] = "all"
+ match['key'] = "value"
+ session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1")
+ session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+ session.exchange_bind(exchange=EX_FAN, queue=QUEUE)
+ session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#")
+ session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+ session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match)
+ match['key2'] = "value2"
+ session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match)
+
+ #
+ # Verify that the queue's binding count accounts for the new bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 8,
+ "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount)
+
+ #
+ # Remove some of the bindings
+ #
+ session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+ session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+ session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2")
+
+ #
+ # Verify that the queue's binding count accounts for the deleted bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 5,
+ "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount)
+ #
+ # Delete the exchanges
+ #
+ session.exchange_delete(exchange=EX_DIR)
+ session.exchange_delete(exchange=EX_FAN)
+ session.exchange_delete(exchange=EX_TOPIC)
+ session.exchange_delete(exchange=EX_HDR)
+
+ #
+ # Verify that the queue's binding count accounts for the lost bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 1,
+ "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+