summaryrefslogtreecommitdiff
path: root/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tests/src/py/qpid_tests/broker_0_10/stats.py')
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/stats.py519
1 files changed, 519 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py b/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py
new file mode 100644
index 0000000000..4f3931b78b
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py
@@ -0,0 +1,519 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.tests.messaging.implementation import *
+from qpid.tests.messaging import Base
+from time import sleep
+from qpidtoollibs.broker import BrokerAgent
+
+#
+# Tests the Broker's statistics reporting
+#
+
+class BrokerStatsTests(Base):
+ """
+ Tests of the broker's statistics
+ """
+
+ def assertEqual(self, left, right, text=None):
+ if not left == right:
+ print "assertEqual failure: %r != %r" % (left, right)
+ if text:
+ print " %r" % text
+ assert None
+
+ def failUnless(self, value, text=None):
+ if value:
+ return
+ print "failUnless failure",
+ if text:
+ print ": %r" % text
+ else:
+ print
+ assert None
+
+ def fail(self, text=None):
+ if text:
+ print "Fail: %r" % text
+ assert None
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self, tx=False):
+ return self.conn.session(transactional=tx)
+
+ def setup_access(self):
+ return BrokerAgent(self.conn)
+
+ def test_exchange_stats(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ agent.addExchange("direct", "stats-test-exchange")
+ try:
+ sess = self.setup_session()
+ tx_a = sess.sender("stats-test-exchange/a")
+ tx_b = sess.sender("stats-test-exchange/b")
+ rx_a = sess.receiver("stats-test-exchange/a")
+
+ exchange = agent.getExchange("stats-test-exchange")
+ self.failUnless(exchange, "expected a valid exchange object")
+ self.assertEqual(exchange.msgReceives, 0, "msgReceives")
+ self.assertEqual(exchange.msgDrops, 0, "msgDrops")
+ self.assertEqual(exchange.msgRoutes, 0, "msgRoutes")
+ self.assertEqual(exchange.byteReceives, 0, "byteReceives")
+ self.assertEqual(exchange.byteDrops, 0, "byteDrops")
+ self.assertEqual(exchange.byteRoutes, 0, "byteRoutes")
+
+ tx_a.send("0123456789")
+ tx_b.send("01234567890123456789")
+ tx_a.send("012345678901234567890123456789")
+ tx_b.send("0123456789012345678901234567890123456789")
+
+ overhead = 63 #overhead added to message from headers
+ exchange.update()
+ self.assertEqual(exchange.msgReceives, 4, "msgReceives")
+ self.assertEqual(exchange.msgDrops, 2, "msgDrops")
+ self.assertEqual(exchange.msgRoutes, 2, "msgRoutes")
+ self.assertEqual(exchange.byteReceives, 100+(4*overhead), "byteReceives")
+ self.assertEqual(exchange.byteDrops, 60+(2*overhead), "byteDrops")
+ self.assertEqual(exchange.byteRoutes, 40+(2*overhead), "byteRoutes")
+ finally:
+ agent.delExchange("stats-test-exchange")
+
+ def test_enqueues_dequeues(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("enqueue_test;{create:always,delete:always}")
+ rx = sess.receiver("enqueue_test")
+
+ queue = agent.getQueue("enqueue_test")
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues")
+ self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
+ self.assertEqual(queue.msgDepth, 0, "msgDepth")
+ self.assertEqual(queue.byteDepth, 0, "byteDepth")
+
+ tx.send("0123456789")
+ tx.send("01234567890123456789")
+ tx.send("012345678901234567890123456789")
+ tx.send("0123456789012345678901234567890123456789")
+ overhead = 38 #overhead added to message from headers
+
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
+ self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
+ self.assertEqual(queue.msgDepth, 4, "msgDepth")
+ self.assertEqual(queue.byteDepth, 100+(4*overhead), "byteDepth")
+
+ now_broker = agent.getBroker()
+ self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues")
+ self.failUnless((now_broker.byteTotalEnqueues - start_broker.byteTotalEnqueues) >= 100, "broker byteTotalEnqueues")
+
+ m = rx.fetch()
+ m = rx.fetch()
+ sess.acknowledge()
+
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
+ self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
+ self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues")
+ self.assertEqual(queue.byteTotalDequeues, 30+(2*overhead), "byteTotalDequeues")
+ self.assertEqual(queue.msgDepth, 2, "msgDepth")
+ self.assertEqual(queue.byteDepth, 70+(2*overhead), "byteDepth")
+
+ now_broker = agent.getBroker()
+ self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues")
+ self.failUnless((now_broker.byteTotalDequeues - start_broker.byteTotalDequeues) >= 30, "broker byteTotalDequeues")
+
+ sess.close()
+
+ now_broker = agent.getBroker()
+ self.assertEqual(now_broker.abandoned - start_broker.abandoned, 2, "expect 2 abandoned messages")
+ self.assertEqual(now_broker.msgDepth, start_broker.msgDepth, "expect broker message depth to be unchanged")
+ self.assertEqual(now_broker.byteDepth, start_broker.byteDepth, "expect broker byte depth to be unchanged")
+
+
+ def test_transactional_enqueues_dequeues(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session(True)
+ tx = sess.sender("tx_enqueue_test;{create:always,delete:always}")
+
+ tx.send("0123456789")
+ tx.send("0123456789")
+ tx.send("0123456789")
+ tx.send("0123456789")
+ overhead = 41 #overhead added to message from headers
+
+ queue = agent.getQueue("tx_enqueue_test")
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues pre-tx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues pre-tx-commit")
+ self.assertEqual(queue.msgTxnEnqueues, 0, "msgTxnEnqueues pre-tx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 0, "byteTxnEnqueues pre-tx-commit")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-tx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-tx-commit")
+ self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-tx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-tx-commit")
+
+ sess.commit()
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-tx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-tx-commit")
+ self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-tx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-tx-commit")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues post-tx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues post-tx-commit")
+ self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues post-tx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues post-tx-commit")
+
+ sess2 = self.setup_session(True)
+ rx = sess2.receiver("tx_enqueue_test")
+
+ m = rx.fetch()
+ m = rx.fetch()
+ m = rx.fetch()
+ m = rx.fetch()
+
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues pre-rx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues pre-rx-commit")
+ self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues pre-rx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues pre-rx-commit")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-rx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-rx-commit")
+ self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-rx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-rx-commit")
+
+ sess2.acknowledge()
+ sess2.commit()
+
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-rx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-rx-commit")
+ self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-rx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-rx-commit")
+ self.assertEqual(queue.msgTotalDequeues, 4, "msgTotalDequeues post-rx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 40+(4*overhead), "byteTotalDequeues post-rx-commit")
+ self.assertEqual(queue.msgTxnDequeues, 4, "msgTxnDequeues post-rx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 40+(4*overhead), "byteTxnDequeues post-rx-commit")
+
+ sess.close()
+ sess2.close()
+
+ now_broker = agent.getBroker()
+ self.assertEqual(now_broker.msgTxnEnqueues - start_broker.msgTxnEnqueues, 4, "broker msgTxnEnqueues")
+ self.assertEqual(now_broker.byteTxnEnqueues - start_broker.byteTxnEnqueues, 40+(4*overhead), "broker byteTxnEnqueues")
+ self.assertEqual(now_broker.msgTxnDequeues - start_broker.msgTxnDequeues, 4, "broker msgTxnDequeues")
+ self.assertEqual(now_broker.byteTxnDequeues - start_broker.byteTxnDequeues, 40+(4*overhead), "broker byteTxnDequeues")
+
+
+ def test_discards_no_route(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("amq.topic/non.existing.key")
+ tx.send("NO_ROUTE")
+ tx.send("NO_ROUTE")
+ tx.send("NO_ROUTE")
+ tx.send("NO_ROUTE")
+ tx.send("NO_ROUTE")
+
+ now_broker = agent.getBroker()
+
+ self.failUnless((now_broker.discardsNoRoute - start_broker.discardsNoRoute) >= 5, "Expect at least 5 no-routes")
+
+ sess.close()
+
+
+ def test_abandoned_alt(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("abandon_alt;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}")
+ rx = sess.receiver("abandon_alt")
+ rx.capacity = 2
+
+ tx.send("ABANDON_ALT")
+ tx.send("ABANDON_ALT")
+ tx.send("ABANDON_ALT")
+ tx.send("ABANDON_ALT")
+ tx.send("ABANDON_ALT")
+
+ rx.fetch()
+
+ sess.close()
+ now_broker = agent.getBroker()
+ self.assertEqual(now_broker.abandonedViaAlt - start_broker.abandonedViaAlt, 5, "Expect 5 abandonedViaAlt")
+ self.assertEqual(now_broker.abandoned - start_broker.abandoned, 0, "Expect 0 abandoned")
+
+
+ def test_discards_ttl(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_ttl;{create:always,delete:always}")
+ msg = Message("TTL")
+ msg.ttl = 1
+
+ tx.send(msg)
+ tx.send(msg)
+ tx.send(msg)
+ tx.send(msg)
+ tx.send(msg)
+ tx.send(msg)
+
+ sleep(2)
+
+ rx = sess.receiver("discards_ttl")
+ try:
+ rx.fetch(0)
+ except:
+ pass
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_ttl")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsTtl, 6, "expect 6 TTL discards on queue")
+ self.assertEqual(now_broker.discardsTtl - start_broker.discardsTtl, 6, "expect 6 TTL discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 6, "expect 6 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_discards_limit_overflow(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_limit;{create:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0}}}}")
+ tx.send("LIMIT")
+ tx.send("LIMIT")
+ tx.send("LIMIT")
+ try:
+ tx.send("LIMIT")
+ self.fail("expected to fail sending 4th message")
+ except:
+ pass
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_limit")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsOverflow, 1, "expect 1 overflow discard on queue")
+ self.assertEqual(now_broker.discardsOverflow - start_broker.discardsOverflow, 1, "expect 1 overflow discard on broker")
+
+ ##
+ ## Shut down and restart the connection to clear the error condition.
+ ##
+ try:
+ self.conn.close(timeout=.1)
+ except:
+ pass
+ self.conn = self.setup_connection()
+
+ ##
+ ## Re-create the session to delete the queue.
+ ##
+ sess = self.setup_session()
+ tx = sess.sender("discards_limit;{create:always,delete:always}")
+ sess.close()
+
+
+ def test_discards_ring_overflow(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_ring;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.policy_type':ring}}}}")
+
+ tx.send("RING")
+ tx.send("RING")
+ tx.send("RING")
+ tx.send("RING")
+ tx.send("RING")
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_ring")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsRing, 2, "expect 2 ring discards on queue")
+ self.assertEqual(now_broker.discardsRing - start_broker.discardsRing, 2, "expect 2 ring discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 2, "expect 2 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_discards_lvq_replace(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_lvq;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.last_value_queue_key':key}}}}")
+ msgA = Message("LVQ_A")
+ msgA.properties['key'] = 'AAA'
+ msgB = Message("LVQ_B")
+ msgB.properties['key'] = 'BBB'
+
+ tx.send(msgA)
+ tx.send(msgB)
+ tx.send(msgA)
+ tx.send(msgA)
+ tx.send(msgB)
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_lvq")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsLvq, 3, "expect 3 lvq discards on queue")
+ self.assertEqual(now_broker.discardsLvq - start_broker.discardsLvq, 3, "expect 3 lvq discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_discards_reject(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_reject;{create:always,delete:always}")
+ tx.send("REJECT")
+ tx.send("REJECT")
+ tx.send("REJECT")
+
+ rx = sess.receiver("discards_reject")
+ m = rx.fetch()
+ sess.acknowledge()
+ m1 = rx.fetch()
+ m2 = rx.fetch()
+ sess.acknowledge(m1, Disposition(REJECTED))
+ sess.acknowledge(m2, Disposition(REJECTED))
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_reject")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsSubscriber, 2, "expect 2 reject discards on queue")
+ self.assertEqual(now_broker.discardsSubscriber - start_broker.discardsSubscriber, 2, "expect 2 reject discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_message_release(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("message_release;{create:always,delete:always}")
+ tx.send("RELEASE")
+ tx.send("RELEASE")
+ tx.send("RELEASE")
+ tx.send("RELEASE")
+ tx.send("RELEASE")
+
+ rx = sess.receiver("message_release")
+ m1 = rx.fetch()
+ m2 = rx.fetch()
+ sess.acknowledge(m1, Disposition(RELEASED))
+ sess.acknowledge(m2, Disposition(RELEASED))
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("message_release")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.acquires, 2, "expect 2 acquires on queue")
+ self.failUnless(now_broker.acquires - start_broker.acquires >= 2, "expect at least 2 acquires on broker")
+ self.assertEqual(queue.msgTotalDequeues, 0, "expect 0 total dequeues on queue")
+
+ self.assertEqual(queue.releases, 2, "expect 2 releases on queue")
+ self.failUnless(now_broker.releases - start_broker.releases >= 2, "expect at least 2 releases on broker")
+
+ sess.close()
+
+
+ def test_discards_purge(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_purge;{create:always,delete:always}")
+ tx.send("PURGE")
+ tx.send("PURGE")
+ tx.send("PURGE")
+ tx.send("PURGE")
+ tx.send("PURGE")
+
+ queue = agent.getQueue("discards_purge")
+ self.failUnless(queue, "expected a valid queue object")
+
+ queue.purge(3)
+ queue.update()
+
+ now_broker = agent.getBroker()
+ self.assertEqual(queue.discardsPurge, 3, "expect 3 purge discards on queue")
+ self.assertEqual(now_broker.discardsPurge - start_broker.discardsPurge, 3, "expect 3 purge discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_reroutes(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("reroute;{create:always,delete:always}")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+
+ queue = agent.getQueue("reroute")
+ self.failUnless(queue, "expected a valid queue object")
+
+ queue.reroute(5, False, 'amq.fanout')
+ queue.update()
+
+ now_broker = agent.getBroker()
+ self.assertEqual(queue.reroutes, 5, "expect 5 reroutes on queue")
+ self.assertEqual(now_broker.reroutes - start_broker.reroutes, 5, "expect 5 reroutes on broker")
+ self.assertEqual(queue.msgTotalDequeues, 5, "expect 5 total dequeues on queue")
+
+ sess.close()
+
+