diff options
Diffstat (limited to 'qpid/tests/src/py/qpid_tests/broker_0_10/stats.py')
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/stats.py | 519 |
1 files changed, 519 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py b/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py new file mode 100644 index 0000000000..4f3931b78b --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py @@ -0,0 +1,519 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.tests.messaging.implementation import * +from qpid.tests.messaging import Base +from time import sleep +from qpidtoollibs.broker import BrokerAgent + +# +# Tests the Broker's statistics reporting +# + +class BrokerStatsTests(Base): + """ + Tests of the broker's statistics + """ + + def assertEqual(self, left, right, text=None): + if not left == right: + print "assertEqual failure: %r != %r" % (left, right) + if text: + print " %r" % text + assert None + + def failUnless(self, value, text=None): + if value: + return + print "failUnless failure", + if text: + print ": %r" % text + else: + print + assert None + + def fail(self, text=None): + if text: + print "Fail: %r" % text + assert None + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self, tx=False): + return self.conn.session(transactional=tx) + + def setup_access(self): + return BrokerAgent(self.conn) + + def test_exchange_stats(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + agent.addExchange("direct", "stats-test-exchange") + try: + sess = self.setup_session() + tx_a = sess.sender("stats-test-exchange/a") + tx_b = sess.sender("stats-test-exchange/b") + rx_a = sess.receiver("stats-test-exchange/a") + + exchange = agent.getExchange("stats-test-exchange") + self.failUnless(exchange, "expected a valid exchange object") + self.assertEqual(exchange.msgReceives, 0, "msgReceives") + self.assertEqual(exchange.msgDrops, 0, "msgDrops") + self.assertEqual(exchange.msgRoutes, 0, "msgRoutes") + self.assertEqual(exchange.byteReceives, 0, "byteReceives") + self.assertEqual(exchange.byteDrops, 0, "byteDrops") + self.assertEqual(exchange.byteRoutes, 0, "byteRoutes") + + tx_a.send("0123456789") + tx_b.send("01234567890123456789") + tx_a.send("012345678901234567890123456789") + tx_b.send("0123456789012345678901234567890123456789") + + overhead = 63 #overhead added to message from headers + exchange.update() + self.assertEqual(exchange.msgReceives, 4, "msgReceives") + self.assertEqual(exchange.msgDrops, 2, "msgDrops") + self.assertEqual(exchange.msgRoutes, 2, "msgRoutes") + self.assertEqual(exchange.byteReceives, 100+(4*overhead), "byteReceives") + self.assertEqual(exchange.byteDrops, 60+(2*overhead), "byteDrops") + self.assertEqual(exchange.byteRoutes, 40+(2*overhead), "byteRoutes") + finally: + agent.delExchange("stats-test-exchange") + + def test_enqueues_dequeues(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("enqueue_test;{create:always,delete:always}") + rx = sess.receiver("enqueue_test") + + queue = agent.getQueue("enqueue_test") + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues") + self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues") + self.assertEqual(queue.msgDepth, 0, "msgDepth") + self.assertEqual(queue.byteDepth, 0, "byteDepth") + + tx.send("0123456789") + tx.send("01234567890123456789") + tx.send("012345678901234567890123456789") + tx.send("0123456789012345678901234567890123456789") + overhead = 38 #overhead added to message from headers + + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues") + self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues") + self.assertEqual(queue.msgDepth, 4, "msgDepth") + self.assertEqual(queue.byteDepth, 100+(4*overhead), "byteDepth") + + now_broker = agent.getBroker() + self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues") + self.failUnless((now_broker.byteTotalEnqueues - start_broker.byteTotalEnqueues) >= 100, "broker byteTotalEnqueues") + + m = rx.fetch() + m = rx.fetch() + sess.acknowledge() + + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues") + self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues") + self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues") + self.assertEqual(queue.byteTotalDequeues, 30+(2*overhead), "byteTotalDequeues") + self.assertEqual(queue.msgDepth, 2, "msgDepth") + self.assertEqual(queue.byteDepth, 70+(2*overhead), "byteDepth") + + now_broker = agent.getBroker() + self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues") + self.failUnless((now_broker.byteTotalDequeues - start_broker.byteTotalDequeues) >= 30, "broker byteTotalDequeues") + + sess.close() + + now_broker = agent.getBroker() + self.assertEqual(now_broker.abandoned - start_broker.abandoned, 2, "expect 2 abandoned messages") + self.assertEqual(now_broker.msgDepth, start_broker.msgDepth, "expect broker message depth to be unchanged") + self.assertEqual(now_broker.byteDepth, start_broker.byteDepth, "expect broker byte depth to be unchanged") + + + def test_transactional_enqueues_dequeues(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session(True) + tx = sess.sender("tx_enqueue_test;{create:always,delete:always}") + + tx.send("0123456789") + tx.send("0123456789") + tx.send("0123456789") + tx.send("0123456789") + overhead = 41 #overhead added to message from headers + + queue = agent.getQueue("tx_enqueue_test") + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues pre-tx-commit") + self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues pre-tx-commit") + self.assertEqual(queue.msgTxnEnqueues, 0, "msgTxnEnqueues pre-tx-commit") + self.assertEqual(queue.byteTxnEnqueues, 0, "byteTxnEnqueues pre-tx-commit") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-tx-commit") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-tx-commit") + self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-tx-commit") + self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-tx-commit") + + sess.commit() + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-tx-commit") + self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-tx-commit") + self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-tx-commit") + self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-tx-commit") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues post-tx-commit") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues post-tx-commit") + self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues post-tx-commit") + self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues post-tx-commit") + + sess2 = self.setup_session(True) + rx = sess2.receiver("tx_enqueue_test") + + m = rx.fetch() + m = rx.fetch() + m = rx.fetch() + m = rx.fetch() + + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues pre-rx-commit") + self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues pre-rx-commit") + self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues pre-rx-commit") + self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues pre-rx-commit") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-rx-commit") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-rx-commit") + self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-rx-commit") + self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-rx-commit") + + sess2.acknowledge() + sess2.commit() + + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-rx-commit") + self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-rx-commit") + self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-rx-commit") + self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-rx-commit") + self.assertEqual(queue.msgTotalDequeues, 4, "msgTotalDequeues post-rx-commit") + self.assertEqual(queue.byteTotalDequeues, 40+(4*overhead), "byteTotalDequeues post-rx-commit") + self.assertEqual(queue.msgTxnDequeues, 4, "msgTxnDequeues post-rx-commit") + self.assertEqual(queue.byteTxnDequeues, 40+(4*overhead), "byteTxnDequeues post-rx-commit") + + sess.close() + sess2.close() + + now_broker = agent.getBroker() + self.assertEqual(now_broker.msgTxnEnqueues - start_broker.msgTxnEnqueues, 4, "broker msgTxnEnqueues") + self.assertEqual(now_broker.byteTxnEnqueues - start_broker.byteTxnEnqueues, 40+(4*overhead), "broker byteTxnEnqueues") + self.assertEqual(now_broker.msgTxnDequeues - start_broker.msgTxnDequeues, 4, "broker msgTxnDequeues") + self.assertEqual(now_broker.byteTxnDequeues - start_broker.byteTxnDequeues, 40+(4*overhead), "broker byteTxnDequeues") + + + def test_discards_no_route(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("amq.topic/non.existing.key") + tx.send("NO_ROUTE") + tx.send("NO_ROUTE") + tx.send("NO_ROUTE") + tx.send("NO_ROUTE") + tx.send("NO_ROUTE") + + now_broker = agent.getBroker() + + self.failUnless((now_broker.discardsNoRoute - start_broker.discardsNoRoute) >= 5, "Expect at least 5 no-routes") + + sess.close() + + + def test_abandoned_alt(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("abandon_alt;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}") + rx = sess.receiver("abandon_alt") + rx.capacity = 2 + + tx.send("ABANDON_ALT") + tx.send("ABANDON_ALT") + tx.send("ABANDON_ALT") + tx.send("ABANDON_ALT") + tx.send("ABANDON_ALT") + + rx.fetch() + + sess.close() + now_broker = agent.getBroker() + self.assertEqual(now_broker.abandonedViaAlt - start_broker.abandonedViaAlt, 5, "Expect 5 abandonedViaAlt") + self.assertEqual(now_broker.abandoned - start_broker.abandoned, 0, "Expect 0 abandoned") + + + def test_discards_ttl(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_ttl;{create:always,delete:always}") + msg = Message("TTL") + msg.ttl = 1 + + tx.send(msg) + tx.send(msg) + tx.send(msg) + tx.send(msg) + tx.send(msg) + tx.send(msg) + + sleep(2) + + rx = sess.receiver("discards_ttl") + try: + rx.fetch(0) + except: + pass + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_ttl") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsTtl, 6, "expect 6 TTL discards on queue") + self.assertEqual(now_broker.discardsTtl - start_broker.discardsTtl, 6, "expect 6 TTL discards on broker") + self.assertEqual(queue.msgTotalDequeues, 6, "expect 6 total dequeues on queue") + + sess.close() + + + def test_discards_limit_overflow(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_limit;{create:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0}}}}") + tx.send("LIMIT") + tx.send("LIMIT") + tx.send("LIMIT") + try: + tx.send("LIMIT") + self.fail("expected to fail sending 4th message") + except: + pass + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_limit") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsOverflow, 1, "expect 1 overflow discard on queue") + self.assertEqual(now_broker.discardsOverflow - start_broker.discardsOverflow, 1, "expect 1 overflow discard on broker") + + ## + ## Shut down and restart the connection to clear the error condition. + ## + try: + self.conn.close(timeout=.1) + except: + pass + self.conn = self.setup_connection() + + ## + ## Re-create the session to delete the queue. + ## + sess = self.setup_session() + tx = sess.sender("discards_limit;{create:always,delete:always}") + sess.close() + + + def test_discards_ring_overflow(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_ring;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.policy_type':ring}}}}") + + tx.send("RING") + tx.send("RING") + tx.send("RING") + tx.send("RING") + tx.send("RING") + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_ring") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsRing, 2, "expect 2 ring discards on queue") + self.assertEqual(now_broker.discardsRing - start_broker.discardsRing, 2, "expect 2 ring discards on broker") + self.assertEqual(queue.msgTotalDequeues, 2, "expect 2 total dequeues on queue") + + sess.close() + + + def test_discards_lvq_replace(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_lvq;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.last_value_queue_key':key}}}}") + msgA = Message("LVQ_A") + msgA.properties['key'] = 'AAA' + msgB = Message("LVQ_B") + msgB.properties['key'] = 'BBB' + + tx.send(msgA) + tx.send(msgB) + tx.send(msgA) + tx.send(msgA) + tx.send(msgB) + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_lvq") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsLvq, 3, "expect 3 lvq discards on queue") + self.assertEqual(now_broker.discardsLvq - start_broker.discardsLvq, 3, "expect 3 lvq discards on broker") + self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") + + sess.close() + + + def test_discards_reject(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_reject;{create:always,delete:always}") + tx.send("REJECT") + tx.send("REJECT") + tx.send("REJECT") + + rx = sess.receiver("discards_reject") + m = rx.fetch() + sess.acknowledge() + m1 = rx.fetch() + m2 = rx.fetch() + sess.acknowledge(m1, Disposition(REJECTED)) + sess.acknowledge(m2, Disposition(REJECTED)) + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_reject") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsSubscriber, 2, "expect 2 reject discards on queue") + self.assertEqual(now_broker.discardsSubscriber - start_broker.discardsSubscriber, 2, "expect 2 reject discards on broker") + self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") + + sess.close() + + + def test_message_release(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("message_release;{create:always,delete:always}") + tx.send("RELEASE") + tx.send("RELEASE") + tx.send("RELEASE") + tx.send("RELEASE") + tx.send("RELEASE") + + rx = sess.receiver("message_release") + m1 = rx.fetch() + m2 = rx.fetch() + sess.acknowledge(m1, Disposition(RELEASED)) + sess.acknowledge(m2, Disposition(RELEASED)) + + now_broker = agent.getBroker() + queue = agent.getQueue("message_release") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.acquires, 2, "expect 2 acquires on queue") + self.failUnless(now_broker.acquires - start_broker.acquires >= 2, "expect at least 2 acquires on broker") + self.assertEqual(queue.msgTotalDequeues, 0, "expect 0 total dequeues on queue") + + self.assertEqual(queue.releases, 2, "expect 2 releases on queue") + self.failUnless(now_broker.releases - start_broker.releases >= 2, "expect at least 2 releases on broker") + + sess.close() + + + def test_discards_purge(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_purge;{create:always,delete:always}") + tx.send("PURGE") + tx.send("PURGE") + tx.send("PURGE") + tx.send("PURGE") + tx.send("PURGE") + + queue = agent.getQueue("discards_purge") + self.failUnless(queue, "expected a valid queue object") + + queue.purge(3) + queue.update() + + now_broker = agent.getBroker() + self.assertEqual(queue.discardsPurge, 3, "expect 3 purge discards on queue") + self.assertEqual(now_broker.discardsPurge - start_broker.discardsPurge, 3, "expect 3 purge discards on broker") + self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") + + sess.close() + + + def test_reroutes(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("reroute;{create:always,delete:always}") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + + queue = agent.getQueue("reroute") + self.failUnless(queue, "expected a valid queue object") + + queue.reroute(5, False, 'amq.fanout') + queue.update() + + now_broker = agent.getBroker() + self.assertEqual(queue.reroutes, 5, "expect 5 reroutes on queue") + self.assertEqual(now_broker.reroutes - start_broker.reroutes, 5, "expect 5 reroutes on broker") + self.assertEqual(queue.msgTotalDequeues, 5, "expect 5 total dequeues on queue") + + sess.close() + + |