summaryrefslogtreecommitdiff
path: root/qpid/python/qpid_tests/broker_0_10/new_api.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid_tests/broker_0_10/new_api.py')
-rw-r--r--qpid/python/qpid_tests/broker_0_10/new_api.py358
1 files changed, 358 insertions, 0 deletions
diff --git a/qpid/python/qpid_tests/broker_0_10/new_api.py b/qpid/python/qpid_tests/broker_0_10/new_api.py
new file mode 100644
index 0000000000..4e94395121
--- /dev/null
+++ b/qpid/python/qpid_tests/broker_0_10/new_api.py
@@ -0,0 +1,358 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.tests.messaging.implementation import *
+from qpid.tests.messaging import Base
+from qpidtoollibs import BrokerAgent
+from time import sleep
+
+#
+# Broker tests using the new messaging API
+#
+
+class GeneralTests(Base):
+ """
+ Tests of the API and broker via the new API.
+ """
+
+ def assertEqual(self, left, right, text=None):
+ if not left == right:
+ print "assertEqual failure: %r != %r" % (left, right)
+ if text:
+ print " %r" % text
+ assert None
+
+ def fail(self, text=None):
+ if text:
+ print "Fail: %r" % text
+ assert None
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def test_not_found(self):
+ ssn = self.setup_session()
+ try:
+ ssn.receiver("does-not-exist")
+ self.fail("Expected non-existent node to cause NotFound exception")
+ except NotFound, e: None
+
+ def test_qpid_3481_acquired_to_alt_exchange(self):
+ """
+ Verify that acquired messages are routed to the alternate when the queue is deleted.
+ """
+ sess1 = self.setup_session()
+ sess2 = self.setup_session()
+
+ tx = sess1.sender("amq.direct/key")
+ rx_main = sess1.receiver("amq.direct/key;{link:{reliability:at-least-once,x-declare:{alternate-exchange:'amq.fanout'}}}")
+ rx_alt = sess2.receiver("amq.fanout")
+ rx_alt.capacity = 10
+
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+
+ msg = rx_main.fetch()
+ msg = rx_main.fetch()
+ msg = rx_main.fetch()
+
+ self.assertEqual(rx_alt.available(), 0, "No messages should have been routed to the alt_exchange")
+
+ sess1.close()
+ sleep(1)
+ self.assertEqual(rx_alt.available(), 5, "All 5 messages should have been routed to the alt_exchange")
+
+ sess2.close()
+
+ def test_qpid_3481_acquired_to_alt_exchange_2_consumers(self):
+ """
+ Verify that acquired messages are routed to the alternate when the queue is deleted.
+ """
+ sess1 = self.setup_session()
+ sess2 = self.setup_session()
+ sess3 = self.setup_session()
+ sess4 = self.setup_session()
+
+ tx = sess1.sender("test_acquired;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}")
+ rx_main1 = sess2.receiver("test_acquired")
+ rx_main2 = sess3.receiver("test_acquired")
+ rx_alt = sess4.receiver("amq.fanout")
+ rx_alt.capacity = 10
+
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+
+ msg = rx_main1.fetch()
+ msg = rx_main1.fetch()
+ msg = rx_main1.fetch()
+
+ self.assertEqual(rx_alt.available(), 0, "No messages should have been routed to the alt_exchange")
+
+ # Close sess1; This will cause the queue to be deleted and all its messages (including those acquired) to be reouted to the alternate exchange
+ sess1.close()
+ sleep(1)
+ self.assertEqual(rx_alt.available(), 5, "All the messages should have been routed to the alt_exchange")
+
+ # Close sess2; This will cause the acquired messages to be requeued and routed to the alternate
+ sess2.close()
+ for i in range(5):
+ try:
+ m = rx_alt.fetch(0)
+ except:
+ self.fail("failed to receive all 5 messages via alternate exchange")
+
+ sess3.close()
+ self.assertEqual(rx_alt.available(), 0, "No further messages should be received via the alternate exchange")
+
+ sess4.close()
+
+ def test_next_receiver(self):
+ keys = ["a", "b", "c"]
+ receivers = [self.ssn.receiver("amq.direct/%s" % k) for k in keys]
+ for r in receivers:
+ r.capacity = 10
+
+ snd = self.ssn.sender("amq.direct")
+
+ for k in keys:
+ snd.send(Message(subject=k, content=k))
+
+ expected = keys
+ while len(expected):
+ rcv = self.ssn.next_receiver(timeout=self.delay())
+ c = rcv.fetch().content
+ assert c in expected
+ expected.remove(c)
+ self.ssn.acknowledge()
+
+ def test_nolocal_rerouted(self):
+ conn2 = Connection.establish(self.broker, **self.connection_options())
+ ssn2 = conn2.session()
+
+ s1 = self.ssn.sender("holding_q; {create:always, delete:always, node:{x-declare:{alternate-exchange:'amq.fanout'}}}");
+ s2 = ssn2.sender("holding_q");
+
+ s2.send(Message("a"));
+ s1.send(Message("b"));
+ s2.send(Message("c"));
+
+ r = self.ssn.receiver("amq.fanout; {link:{x-declare:{arguments:{'no-local':True}}}}")
+
+ # close connection of one of the publishers
+ conn2.close()
+
+ # close sender which should cause the orphaned messages on
+ # holding_q to be rerouted through alternate exchange onto the
+ # subscription queue of the receiver above
+ s1.close()
+
+ received = []
+ try:
+ while True:
+ received.append(r.fetch(0).content)
+ except Empty: pass
+ self.assertEqual(received, ["a", "c"])
+
+ def _node_disambiguation_test(self, e, q, ambiguous_send=False):
+ s1 = self.ssn.sender("ambiguous; {node:{type:topic}}");
+ s2 = self.ssn.sender("ambiguous; {node:{type:queue}}");
+ s1.send(Message("a"))
+ s2.send(Message("b"))
+ if ambiguous_send:
+ # pure python client defaults to using the queue when the
+ # node name is ambiguous and no type is specified; the
+ # swigged version treats this as an error
+ s3 = self.ssn.sender("ambiguous");
+ s3.send(Message("c"))
+ self.assertEqual(e.fetch().content, "a")
+ self.assertEqual(q.fetch().content, "b")
+ if ambiguous_send:
+ self.assertEqual(q.fetch().content, "c")
+ for r in [e, q]:
+ try:
+ m = r.fetch(timeout=0)
+ self.fail("Found unexpected message %s")
+ except Empty: pass
+
+ def _node_disambiguation_precreated(self, ambiguous_send):
+ agent = BrokerAgent(self.conn)
+ agent.addExchange("fanout", "ambiguous")
+ agent.addQueue("ambiguous")
+ try:
+ r1 = self.ssn.receiver("ambiguous; {node:{type:topic}}")
+ r2 = self.ssn.receiver("ambiguous; {node:{type:queue}}")
+ self._node_disambiguation_test(r1, r2, ambiguous_send=ambiguous_send)
+ finally:
+ agent.delExchange("ambiguous")
+ agent.delQueue("ambiguous", False, False)
+
+ def test_node_disambiguation_1(self):
+ self._node_disambiguation_precreated(False)
+
+ def test_node_disambiguation_2(self):
+ self._node_disambiguation_precreated(True)
+
+ def test_ambiguous_create_1(self):
+ #create queue first, then exchange
+ r1 = self.ssn.receiver("ambiguous; {create:receiver, node:{type:queue}}")
+ r2 = self.ssn.receiver("ambiguous; {create:receiver, node:{type:topic}}")
+ agent = BrokerAgent(self.conn)
+ exchange = agent.getExchange("ambiguous")
+ queue = agent.getQueue("ambiguous")
+ try:
+ assert(exchange)
+ assert(queue)
+ self._node_disambiguation_test(r2, r1)
+ finally:
+ if exchange: agent.delExchange("ambiguous")
+ if queue: agent.delQueue("ambiguous", False, False)
+
+ def test_ambiguous_create_2(self):
+ #create exchange first, then queue
+ r1 = self.ssn.receiver("ambiguous; {create:receiver, node:{type:topic}}")
+ r2 = self.ssn.receiver("ambiguous; {create:receiver, node:{type:queue}}")
+ agent = BrokerAgent(self.conn)
+ exchange = agent.getExchange("ambiguous")
+ queue = agent.getQueue("ambiguous")
+ try:
+ assert(exchange)
+ assert(queue)
+ self._node_disambiguation_test(r1, r2)
+ finally:
+ if exchange: agent.delExchange("ambiguous")
+ if queue: agent.delQueue("ambiguous", False, False)
+
+ def test_ambiguous_delete_1(self):
+ agent = BrokerAgent(self.conn)
+ agent.addExchange("fanout", "ambiguous")
+ agent.addQueue("ambiguous")
+ self.ssn.receiver("ambiguous; {delete:receiver, node:{type:topic}}").close()
+ exchange = agent.getExchange("ambiguous")
+ queue = agent.getQueue("ambiguous")
+ try:
+ assert(not exchange)
+ assert(queue)
+ finally:
+ if exchange: agent.delExchange("ambiguous")
+ if queue: agent.delQueue("ambiguous", False, False)
+
+ def test_ambiguous_delete_2(self):
+ agent = BrokerAgent(self.conn)
+ agent.addExchange("fanout", "ambiguous")
+ agent.addQueue("ambiguous")
+ self.ssn.receiver("ambiguous; {delete:receiver, node:{type:queue}}").close()
+ exchange = agent.getExchange("ambiguous")
+ queue = agent.getQueue("ambiguous")
+ try:
+ assert(exchange)
+ assert(not queue)
+ finally:
+ if exchange: agent.delExchange("ambiguous")
+ if queue: agent.delQueue("ambiguous", False, False)
+
+
+class SequenceNumberTests(Base):
+ """
+ Tests of ring queue sequence number
+ """
+
+ def fail(self, text=None):
+ if text:
+ print "Fail: %r" % text
+ assert None
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self, name="ring-sequence-queue", key="qpid.queue_msg_sequence"):
+ addr = "%s; {create:sender, node: {x-declare: {auto-delete: True, arguments: {'qpid.queue_msg_sequence':'%s', 'qpid.policy_type':'ring', 'qpid.max_count':4}}}}" % (name, key)
+ sender = self.ssn.sender(addr)
+ return sender
+
+ def test_create_sequence_queue(self):
+ """
+ Test a queue with sequencing can be created
+ """
+
+ #setup, declare a queue
+ try:
+ sender = self.setup_sender()
+ except:
+ self.fail("Unable to create ring queue with sequencing enabled")
+
+ def test_get_sequence_number(self):
+ """
+ Test retrieving sequence number for queues
+ """
+
+ key = "k"
+ sender = self.setup_sender("ring-sequence-queue2", key=key)
+
+ # send and receive 1 message and test the sequence number
+ msg = Message()
+ sender.send(msg)
+
+ receiver = self.ssn.receiver("ring-sequence-queue2")
+ msg = receiver.fetch(1)
+ try:
+ seqNo = msg.properties[key]
+ if int(seqNo) != 1:
+ txt = "Unexpected sequence number. Should be 1. Received (%s)" % seqNo
+ self.fail(txt)
+ except:
+ txt = "Unable to get key (%s) from message properties" % key
+ self.fail(txt)
+ receiver.close()
+
+ def test_sequence_number_gap(self):
+ """
+ Test that sequence number for ring queues shows gaps when queue
+ messages are overwritten
+ """
+ key = "qpid.seq"
+ sender = self.setup_sender("ring-sequence-queue3", key=key)
+ receiver = self.ssn.receiver("ring-sequence-queue3")
+
+ msg = Message()
+ sender.send(msg)
+ msg = receiver.fetch(1)
+
+ # send 5 more messages to overflow the queue
+ for i in range(5):
+ sender.send(msg)
+
+ msg = receiver.fetch(1)
+ seqNo = msg.properties[key]
+ if int(seqNo) != 3:
+ txt = "Unexpected sequence number. Should be 3. Received (%s)" % seqNo
+ self.fail(txt)
+ receiver.close()
+