diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-02-05 15:08:44 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-05 15:08:44 +0000 |
commit | 3df1424a43244ee1c347ff9163650753601c6893 (patch) | |
tree | 333c4228d660a6d3d175ddfe76403f1f37e75af1 /qpid/tests | |
parent | 5b4dc91393493685c6688d0b69334333413616fe (diff) | |
download | qpid-python-3df1424a43244ee1c347ff9163650753601c6893.tar.gz |
moved protocol tests from qpid/python to qpid/tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@906961 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tests')
24 files changed, 5858 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/__init__.py b/qpid/tests/src/py/qpid_tests/__init__.py new file mode 100644 index 0000000000..7b522f59af --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/__init__.py @@ -0,0 +1,22 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import broker_0_10, broker_0_9, broker_0_8 diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py new file mode 100644 index 0000000000..f9315a6f90 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -0,0 +1,31 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from alternate_exchange import * +from broker import * +from dtx import * +from example import * +from exchange import * +from management import * +from message import * +from query import * +from queue import * +from tx import * diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py new file mode 100644 index 0000000000..4d8617eb8e --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py @@ -0,0 +1,204 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import traceback +from qpid.queue import Empty +from qpid.datatypes import Message +from qpid.testlib import TestBase010 +from qpid.session import SessionException + +class AlternateExchangeTests(TestBase010): + """ + Tests for the new mechanism for message returns introduced in 0-10 + and available in 0-9 for preview + """ + + def test_unroutable(self): + """ + Test that unroutable messages are delivered to the alternate-exchange if specified + """ + session = self.session + #create an exchange with an alternate defined + session.exchange_declare(exchange="secondary", type="fanout") + session.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary") + + #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages + session.queue_declare(queue="returns", exclusive=True, auto_delete=True) + session.exchange_bind(queue="returns", exchange="secondary") + session.message_subscribe(destination="a", queue="returns") + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + returned = session.incoming("a") + + #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages + session.queue_declare(queue="processed", exclusive=True, auto_delete=True) + session.exchange_bind(queue="processed", exchange="primary", binding_key="my-key") + session.message_subscribe(destination="b", queue="processed") + session.message_flow(destination="b", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="b", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + processed = session.incoming("b") + + #publish to the primary exchange + #...one message that makes it to the 'processed' queue: + dp=self.session.delivery_properties(routing_key="my-key") + session.message_transfer(destination="primary", message=Message(dp, "Good")) + #...and one that does not: + dp=self.session.delivery_properties(routing_key="unused-key") + session.message_transfer(destination="primary", message=Message(dp, "Bad")) + + #delete the exchanges + session.exchange_delete(exchange="primary") + session.exchange_delete(exchange="secondary") + + #verify behaviour + self.assertEqual("Good", processed.get(timeout=1).body) + self.assertEqual("Bad", returned.get(timeout=1).body) + self.assertEmpty(processed) + self.assertEmpty(returned) + + def test_queue_delete(self): + """ + Test that messages in a queue being deleted are delivered to the alternate-exchange if specified + """ + session = self.session + #set up a 'dead letter queue': + session.exchange_declare(exchange="dlq", type="fanout") + session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="dlq", queue="deleted") + session.message_subscribe(destination="dlq", queue="deleted") + session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + dlq = session.incoming("dlq") + + #create a queue using the dlq as its alternate exchange: + session.queue_declare(queue="delete-me", alternate_exchange="dlq") + #send it some messages: + dp=self.session.delivery_properties(routing_key="delete-me") + session.message_transfer(message=Message(dp, "One")) + session.message_transfer(message=Message(dp, "Two")) + session.message_transfer(message=Message(dp, "Three")) + #delete it: + session.queue_delete(queue="delete-me") + #delete the dlq exchange: + session.exchange_delete(exchange="dlq") + + #check the messages were delivered to the dlq: + self.assertEqual("One", dlq.get(timeout=1).body) + self.assertEqual("Two", dlq.get(timeout=1).body) + self.assertEqual("Three", dlq.get(timeout=1).body) + self.assertEmpty(dlq) + + def test_delete_while_used_by_queue(self): + """ + Ensure an exchange still in use as an alternate-exchange for a + queue can't be deleted + """ + session = self.session + session.exchange_declare(exchange="alternate", type="fanout") + + session2 = self.conn.session("alternate", 2) + session2.queue_declare(queue="q", alternate_exchange="alternate") + try: + session2.exchange_delete(exchange="alternate") + self.fail("Expected deletion of in-use alternate-exchange to fail") + except SessionException, e: + session = self.session + session.queue_delete(queue="q") + session.exchange_delete(exchange="alternate") + self.assertEquals(530, e.args[0].error_code) + + + def test_delete_while_used_by_exchange(self): + """ + Ensure an exchange still in use as an alternate-exchange for + another exchange can't be deleted + """ + session = self.session + session.exchange_declare(exchange="alternate", type="fanout") + + session = self.conn.session("alternate", 2) + session.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate") + try: + session.exchange_delete(exchange="alternate") + self.fail("Expected deletion of in-use alternate-exchange to fail") + except SessionException, e: + session = self.session + session.exchange_delete(exchange="e") + session.exchange_delete(exchange="alternate") + self.assertEquals(530, e.args[0].error_code) + + + def test_modify_existing_exchange_alternate(self): + """ + Ensure that attempting to modify an exhange to change + the alternate throws an exception + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="alt2", type="direct") + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1") + try: + # attempt to change the alternate on an already existing exchange + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2") + self.fail("Expected changing an alternate on an existing exchange to fail") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + session = self.conn.session("alternate", 2) + session.exchange_delete(exchange="onealternate") + session.exchange_delete(exchange="alt2") + session.exchange_delete(exchange="alt1") + + + def test_add_alternate_to_exchange(self): + """ + Ensure that attempting to modify an exhange by adding + an alternate throws an exception + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="noalternate", type="fanout") + try: + # attempt to add an alternate on an already existing exchange + session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1") + self.fail("Expected adding an alternate on an existing exchange to fail") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + session = self.conn.session("alternate", 2) + session.exchange_delete(exchange="noalternate") + session.exchange_delete(exchange="alt1") + + + def test_del_alternate_to_exchange(self): + """ + Ensure that attempting to modify an exhange by declaring + it again without an alternate does nothing + """ + session = self.session + session.exchange_declare(exchange="alt1", type="direct") + session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1") + # attempt to re-declare without an alternate - silently ignore + session.exchange_declare(exchange="onealternate", type="fanout" ) + session.exchange_delete(exchange="onealternate") + session.exchange_delete(exchange="alt1") + + + def assertEmpty(self, queue): + try: + msg = queue.get(timeout=1) + self.fail("Queue not empty: " + msg) + except Empty: None diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py b/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py new file mode 100644 index 0000000000..81d723e322 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Closed +from qpid.queue import Empty +from qpid.testlib import TestBase010 +from qpid.datatypes import Message, RangedSet + +class BrokerTests(TestBase010): + """Tests for basic Broker functionality""" + + def test_ack_and_no_ack(self): + """ + First, this test tries to receive a message with a no-ack + consumer. Second, this test tries to explicitly receive and + acknowledge a message with an acknowledging consumer. + """ + session = self.session + session.queue_declare(queue = "myqueue", exclusive=True, auto_delete=True) + + # No ack consumer + ctag = "tag1" + session.message_subscribe(queue = "myqueue", destination = ctag) + session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) + body = "test no-ack" + session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"), body)) + msg = session.incoming(ctag).get(timeout = 5) + self.assert_(msg.body == body) + + # Acknowledging consumer + session.queue_declare(queue = "otherqueue", exclusive=True, auto_delete=True) + ctag = "tag2" + session.message_subscribe(queue = "otherqueue", destination = ctag, accept_mode = 1) + session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) + body = "test ack" + session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"), body)) + msg = session.incoming(ctag).get(timeout = 5) + session.message_accept(RangedSet(msg.id)) + self.assert_(msg.body == body) + + def test_simple_delivery_immediate(self): + """ + Test simple message delivery where consume is issued before publish + """ + session = self.session + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="test-queue", exchange="amq.fanout") + consumer_tag = "tag1" + session.message_subscribe(queue="test-queue", destination=consumer_tag) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) + queue = session.incoming(consumer_tag) + + body = "Immediate Delivery" + session.message_transfer("amq.fanout", None, None, Message(body)) + msg = queue.get(timeout=5) + self.assert_(msg.body == body) + + def test_simple_delivery_queued(self): + """ + Test basic message delivery where publish is issued before consume + (i.e. requires queueing of the message) + """ + session = self.session + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="test-queue", exchange="amq.fanout") + body = "Queued Delivery" + session.message_transfer("amq.fanout", None, None, Message(body)) + + consumer_tag = "tag1" + session.message_subscribe(queue="test-queue", destination=consumer_tag) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) + queue = session.incoming(consumer_tag) + msg = queue.get(timeout=5) + self.assert_(msg.body == body) diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py new file mode 100644 index 0000000000..2823385a3b --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py @@ -0,0 +1,775 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.datatypes import Message, RangedSet +from qpid.session import SessionException +from qpid.testlib import TestBase010 +from qpid.compat import set +from struct import pack, unpack +from time import sleep + +class DtxTests(TestBase010): + """ + Tests for the amqp dtx related classes. + + Tests of the form test_simple_xxx test the basic transactional + behaviour. The approach here is to 'swap' a message from one queue + to another by consuming and re-publishing in the same + transaction. That transaction is then completed in different ways + and the appropriate result verified. + + The other tests enforce more specific rules and behaviour on a + per-method or per-field basis. + """ + + XA_RBROLLBACK = 1 + XA_RBTIMEOUT = 2 + XA_OK = 0 + tx_counter = 0 + + def reset_channel(self): + self.session.close() + self.session = self.conn.session("dtx-session", 1) + + def test_simple_commit(self): + """ + Test basic one-phase commit behaviour. + """ + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session + tx = self.xid("my-xid") + self.txswap(tx, "commit") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status) + + #should close and reopen session to ensure no unacked messages are held + self.reset_channel() + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("commit", "queue-b") + + def test_simple_prepare_commit(self): + """ + Test basic two-phase commit behaviour. + """ + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session + tx = self.xid("my-xid") + self.txswap(tx, "prepare-commit") + + #prepare + self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status) + + self.reset_channel() + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("prepare-commit", "queue-b") + + + def test_simple_rollback(self): + """ + Test basic rollback behaviour. + """ + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session + tx = self.xid("my-xid") + self.txswap(tx, "rollback") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) + + self.reset_channel() + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("rollback", "queue-a") + + def test_simple_prepare_rollback(self): + """ + Test basic rollback behaviour after the transaction has been prepared. + """ + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session + tx = self.xid("my-xid") + self.txswap(tx, "prepare-rollback") + + #prepare + self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) + + self.reset_channel() + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("prepare-rollback", "queue-a") + + def test_select_required(self): + """ + check that an error is flagged if select is not issued before + start or end + """ + session = self.session + tx = self.xid("dummy") + try: + session.dtx_start(xid=tx) + + #if we get here we have failed, but need to do some cleanup: + session.dtx_end(xid=tx) + session.dtx_rollback(xid=tx) + self.fail("Session not selected for use with dtx, expected exception!") + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) + + def test_start_already_known(self): + """ + Verify that an attempt to start an association with a + transaction that is already known is not allowed (unless the + join flag is set). + """ + #create two sessions on different connection & select them for use with dtx: + session1 = self.session + session1.dtx_select() + + other = self.connect() + session2 = other.session("other", 0) + session2.dtx_select() + + #create a xid + tx = self.xid("dummy") + #start work on one session under that xid: + session1.dtx_start(xid=tx) + #then start on the other without the join set + failed = False + try: + session2.dtx_start(xid=tx) + except SessionException, e: + failed = True + error = e + + #cleanup: + if not failed: + session2.dtx_end(xid=tx) + other.close() + session1.dtx_end(xid=tx) + session1.dtx_rollback(xid=tx) + + #verification: + if failed: self.assertEquals(530, error.args[0].error_code) + else: self.fail("Xid already known, expected exception!") + + def test_forget_xid_on_completion(self): + """ + Verify that a xid is 'forgotten' - and can therefore be used + again - once it is completed. + """ + #do some transactional work & complete the transaction + self.test_simple_commit() + # session has been reset, so reselect for use with dtx + self.session.dtx_select() + + #start association for the same xid as the previously completed txn + tx = self.xid("my-xid") + self.session.dtx_start(xid=tx) + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) + + def test_start_join_and_resume(self): + """ + Ensure the correct error is signalled when both the join and + resume flags are set on starting an association between a + session and a transcation. + """ + session = self.session + session.dtx_select() + tx = self.xid("dummy") + try: + session.dtx_start(xid=tx, join=True, resume=True) + #failed, but need some cleanup: + session.dtx_end(xid=tx) + session.dtx_rollback(xid=tx) + self.fail("Join and resume both set, expected exception!") + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) + + def test_start_join(self): + """ + Verify 'join' behaviour, where a session is associated with a + transaction that is already associated with another session. + """ + guard = self.keepQueuesAlive(["one", "two"]) + #create two sessions & select them for use with dtx: + session1 = self.session + session1.dtx_select() + + session2 = self.conn.session("second", 2) + session2.dtx_select() + + #setup + session1.queue_declare(queue="one", auto_delete=True) + session1.queue_declare(queue="two", auto_delete=True) + session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage")) + session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage")) + + #create a xid + tx = self.xid("dummy") + #start work on one session under that xid: + session1.dtx_start(xid=tx) + #then start on the other with the join flag set + session2.dtx_start(xid=tx, join=True) + + #do work through each session + self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two' + self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one' + + #mark end on both sessions + session1.dtx_end(xid=tx) + session2.dtx_end(xid=tx) + + #commit and check + session1.dtx_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + + def test_suspend_resume(self): + """ + Test suspension and resumption of an association + """ + session = self.session + session.dtx_select() + + #setup + session.queue_declare(queue="one", exclusive=True, auto_delete=True) + session.queue_declare(queue="two", exclusive=True, auto_delete=True) + session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) + session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) + + tx = self.xid("dummy") + + session.dtx_start(xid=tx) + self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' + session.dtx_end(xid=tx, suspend=True) + + session.dtx_start(xid=tx, resume=True) + self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' + session.dtx_end(xid=tx) + + #commit and check + session.dtx_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + def test_suspend_start_end_resume(self): + """ + Test suspension and resumption of an association with work + done on another transaction when the first transaction is + suspended + """ + session = self.session + session.dtx_select() + + #setup + session.queue_declare(queue="one", exclusive=True, auto_delete=True) + session.queue_declare(queue="two", exclusive=True, auto_delete=True) + session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) + session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) + + tx = self.xid("dummy") + + session.dtx_start(xid=tx) + self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' + session.dtx_end(xid=tx, suspend=True) + + session.dtx_start(xid=tx, resume=True) + self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' + session.dtx_end(xid=tx) + + #commit and check + session.dtx_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + def test_end_suspend_and_fail(self): + """ + Verify that the correct error is signalled if the suspend and + fail flag are both set when disassociating a transaction from + the session + """ + session = self.session + session.dtx_select() + tx = self.xid("suspend_and_fail") + session.dtx_start(xid=tx) + try: + session.dtx_end(xid=tx, suspend=True, fail=True) + self.fail("Suspend and fail both set, expected exception!") + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) + + #cleanup + other = self.connect() + session = other.session("cleanup", 1) + session.dtx_rollback(xid=tx) + session.close() + other.close() + + + def test_end_unknown_xid(self): + """ + Verifies that the correct exception is thrown when an attempt + is made to end the association for a xid not previously + associated with the session + """ + session = self.session + session.dtx_select() + tx = self.xid("unknown-xid") + try: + session.dtx_end(xid=tx) + self.fail("Attempted to end association with unknown xid, expected exception!") + except SessionException, e: + self.assertEquals(409, e.args[0].error_code) + + def test_end(self): + """ + Verify that the association is terminated by end and subsequent + operations are non-transactional + """ + guard = self.keepQueuesAlive(["tx-queue"]) + session = self.conn.session("alternate", 1) + session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True) + + #publish a message under a transaction + session.dtx_select() + tx = self.xid("dummy") + session.dtx_start(xid=tx) + session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage")) + session.dtx_end(xid=tx) + + #now that association with txn is ended, publish another message + session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage")) + + #check the second message is available, but not the first + self.assertMessageCount(1, "tx-queue") + self.subscribe(session, queue="tx-queue", destination="results") + msg = session.incoming("results").get(timeout=1) + self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id')) + session.message_cancel(destination="results") + #ack the message then close the session + session.message_accept(RangedSet(msg.id)) + session.close() + + session = self.session + #commit the transaction and check that the first message (and + #only the first message) is then delivered + session.dtx_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "tx-queue") + self.assertMessageId("one", "tx-queue") + + def test_invalid_commit_one_phase_true(self): + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.session("tester", 1) + tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + tester.dtx_select() + tx = self.xid("dummy") + tester.dtx_start(xid=tx) + tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + tester.dtx_end(xid=tx) + tester.dtx_prepare(xid=tx) + failed = False + try: + tester.dtx_commit(xid=tx, one_phase=True) + except SessionException, e: + failed = True + error = e + + if failed: + self.session.dtx_rollback(xid=tx) + self.assertEquals(409, error.args[0].error_code) + else: + tester.close() + other.close() + self.fail("Invalid use of one_phase=True, expected exception!") + + def test_invalid_commit_one_phase_false(self): + """ + Test that a commit with one_phase = False is rejected if the + transaction in question has not yet been prepared. + """ + other = self.connect() + tester = other.session("tester", 1) + tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + tester.dtx_select() + tx = self.xid("dummy") + tester.dtx_start(xid=tx) + tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + tester.dtx_end(xid=tx) + failed = False + try: + tester.dtx_commit(xid=tx, one_phase=False) + except SessionException, e: + failed = True + error = e + + if failed: + self.session.dtx_rollback(xid=tx) + self.assertEquals(409, error.args[0].error_code) + else: + tester.close() + other.close() + self.fail("Invalid use of one_phase=False, expected exception!") + + def test_invalid_commit_not_ended(self): + """ + Test that a commit fails if the xid is still associated with a session. + """ + other = self.connect() + tester = other.session("tester", 1) + self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + self.session.dtx_select() + tx = self.xid("dummy") + self.session.dtx_start(xid=tx) + self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + + failed = False + try: + tester.dtx_commit(xid=tx, one_phase=False) + except SessionException, e: + failed = True + error = e + + if failed: + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) + self.assertEquals(409, error.args[0].error_code) + else: + tester.close() + other.close() + self.fail("Commit should fail as xid is still associated!") + + def test_invalid_rollback_not_ended(self): + """ + Test that a rollback fails if the xid is still associated with a session. + """ + other = self.connect() + tester = other.session("tester", 1) + self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + self.session.dtx_select() + tx = self.xid("dummy") + self.session.dtx_start(xid=tx) + self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + + failed = False + try: + tester.dtx_rollback(xid=tx) + except SessionException, e: + failed = True + error = e + + if failed: + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) + self.assertEquals(409, error.args[0].error_code) + else: + tester.close() + other.close() + self.fail("Rollback should fail as xid is still associated!") + + + def test_invalid_prepare_not_ended(self): + """ + Test that a prepare fails if the xid is still associated with a session. + """ + other = self.connect() + tester = other.session("tester", 1) + self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + self.session.dtx_select() + tx = self.xid("dummy") + self.session.dtx_start(xid=tx) + self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + + failed = False + try: + tester.dtx_prepare(xid=tx) + except SessionException, e: + failed = True + error = e + + if failed: + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) + self.assertEquals(409, error.args[0].error_code) + else: + tester.close() + other.close() + self.fail("Rollback should fail as xid is still associated!") + + def test_implicit_end(self): + """ + Test that an association is implicitly ended when the session + is closed (whether by exception or explicit client request) + and the transaction in question is marked as rollback only. + """ + session1 = self.session + session2 = self.conn.session("other", 2) + + #setup: + session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever")) + tx = self.xid("dummy") + + session2.dtx_select() + session2.dtx_start(xid=tx) + session2.message_subscribe(queue="dummy", destination="dummy") + session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1) + session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFFL) + msg = session2.incoming("dummy").get(timeout=1) + session2.message_accept(RangedSet(msg.id)) + session2.message_cancel(destination="dummy") + session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever")) + session2.close() + + self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status) + session1.dtx_rollback(xid=tx) + + def test_get_timeout(self): + """ + Check that get-timeout returns the correct value, (and that a + transaction with a timeout can complete normally) + """ + session = self.session + tx = self.xid("dummy") + + session.dtx_select() + session.dtx_start(xid=tx) + self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout) + session.dtx_set_timeout(xid=tx, timeout=60) + self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) + self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) + + def test_set_timeout(self): + """ + Test the timeout of a transaction results in the expected + behaviour + """ + + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + #open new session to allow self.session to be used in checking the queue + session = self.conn.session("worker", 1) + #setup: + tx = self.xid("dummy") + session.queue_declare(queue="queue-a", auto_delete=True) + session.queue_declare(queue="queue-b", auto_delete=True) + session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage")) + + session.dtx_select() + session.dtx_start(xid=tx) + self.swap(session, "queue-a", "queue-b") + session.dtx_set_timeout(xid=tx, timeout=2) + sleep(3) + #check that the work has been rolled back already + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("timeout", "queue-a") + #check the correct codes are returned when we try to complete the txn + self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) + + + + def test_recover(self): + """ + Test basic recover behaviour + """ + session = self.session + + session.dtx_select() + session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + + prepared = [] + for i in range(1, 10): + tx = self.xid("tx%s" % (i)) + session.dtx_start(xid=tx) + session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i))) + session.dtx_end(xid=tx) + if i in [2, 5, 6, 8]: + session.dtx_prepare(xid=tx) + prepared.append(tx) + else: + session.dtx_rollback(xid=tx) + + xids = session.dtx_recover().in_doubt + + #rollback the prepared transactions returned by recover + for x in xids: + session.dtx_rollback(xid=x) + + #validate against the expected list of prepared transactions + actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these + expected = set([x.global_id for x in prepared]) + intersection = actual.intersection(expected) + + if intersection != expected: + missing = expected.difference(actual) + extra = actual.difference(expected) + self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) + + def test_bad_resume(self): + """ + Test that a resume on a session not selected for use with dtx fails + """ + session = self.session + try: + session.dtx_start(resume=True) + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) + + def test_prepare_unknown(self): + session = self.session + try: + session.dtx_prepare(xid=self.xid("unknown")) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_commit_unknown(self): + session = self.session + try: + session.dtx_commit(xid=self.xid("unknown")) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_rollback_unknown(self): + session = self.session + try: + session.dtx_rollback(xid=self.xid("unknown")) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_get_timeout_unknown(self): + session = self.session + try: + session.dtx_get_timeout(xid=self.xid("unknown")) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def xid(self, txid): + DtxTests.tx_counter += 1 + branchqual = "v%s" % DtxTests.tx_counter + return self.session.xid(format=0, global_id=txid, branch_id=branchqual) + + def txswap(self, tx, id): + session = self.session + #declare two queues: + session.queue_declare(queue="queue-a", auto_delete=True) + session.queue_declare(queue="queue-b", auto_delete=True) + + #put message with specified id on one queue: + dp=session.delivery_properties(routing_key="queue-a") + mp=session.message_properties(correlation_id=id) + session.message_transfer(message=Message(dp, mp, "DtxMessage")) + + #start the transaction: + session.dtx_select() + self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) + + #'swap' the message from one queue to the other, under that transaction: + self.swap(self.session, "queue-a", "queue-b") + + #mark the end of the transactional work: + self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status) + + def swap(self, session, src, dest): + #consume from src: + session.message_subscribe(destination="temp-swap", queue=src) + session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1) + session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + msg = session.incoming("temp-swap").get(timeout=1) + session.message_cancel(destination="temp-swap") + session.message_accept(RangedSet(msg.id)) + #todo: also complete at this point? + + #re-publish to dest: + dp=session.delivery_properties(routing_key=dest) + mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id')) + session.message_transfer(message=Message(dp, mp, msg.body)) + + def assertMessageCount(self, expected, queue): + self.assertEqual(expected, self.session.queue_query(queue=queue).message_count) + + def assertMessageId(self, expected, queue): + self.session.message_subscribe(queue=queue, destination="results") + self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1) + self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) + self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id')) + self.session.message_cancel(destination="results") + + def getMessageProperty(self, msg, prop): + for h in msg.headers: + if hasattr(h, prop): return getattr(h, prop) + return None + + def keepQueuesAlive(self, names): + session = self.conn.session("nasty", 99) + for n in names: + session.queue_declare(queue=n, auto_delete=True) + session.message_subscribe(destination=n, queue=n) + return session + + def createMessage(self, session, key, id, body): + dp=session.delivery_properties(routing_key=key) + mp=session.message_properties(correlation_id=id) + session.message_transfer(message=Message(dp, mp, body)) diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/example.py b/qpid/tests/src/py/qpid_tests/broker_0_10/example.py new file mode 100644 index 0000000000..e36907d501 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/example.py @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.datatypes import Message, RangedSet +from qpid.testlib import TestBase010 + +class ExampleTest (TestBase010): + """ + An example Qpid test, illustrating the unittest framework and the + python Qpid client. The test class must inherit TestBase. The + test code uses the Qpid client to interact with a qpid broker and + verify it behaves as expected. + """ + + def test_example(self): + """ + An example test. Note that test functions must start with 'test_' + to be recognized by the test framework. + """ + + # By inheriting TestBase, self.client is automatically connected + # and self.session is automatically opened as session(1) + # Other session methods mimic the protocol. + session = self.session + + # Now we can send regular commands. If you want to see what the method + # arguments mean or what other commands are available, you can use the + # python builtin help() method. For example: + #help(chan) + #help(chan.exchange_declare) + + # If you want browse the available protocol methods without being + # connected to a live server you can use the amqp-doc utility: + # + # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] + # + # Options: + # -e, --regexp use regex instead of glob when matching + + # Now that we know what commands are available we can use them to + # interact with the server. + + # Here we use ordinal arguments. + session.exchange_declare("test", "direct") + + # Here we use keyword arguments. + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="test-queue", exchange="test", binding_key="key") + + # Call Session.subscribe to register as a consumer. + # All the protocol methods return a message object. The message object + # has fields corresponding to the reply method fields, plus a content + # field that is filled if the reply includes content. In this case the + # interesting field is the consumer_tag. + session.message_subscribe(queue="test-queue", destination="consumer_tag") + session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + + # We can use the session.incoming(...) method to access the messages + # delivered for our consumer_tag. + queue = session.incoming("consumer_tag") + + # Now lets publish a message and see if our consumer gets it. To do + # this we need to import the Message class. + delivery_properties = session.delivery_properties(routing_key="key") + sent = Message(delivery_properties, "Hello World!") + session.message_transfer(destination="test", message=sent) + + # Now we'll wait for the message to arrive. We can use the timeout + # argument in case the server hangs. By default queue.get() will wait + # until a message arrives or the connection to the server dies. + msg = queue.get(timeout=10) + + # And check that we got the right response with assertEqual + self.assertEqual(sent.body, msg.body) + + # Now acknowledge the message. + session.message_accept(RangedSet(msg.id)) + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py new file mode 100644 index 0000000000..0ac78a4799 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py @@ -0,0 +1,461 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Tests for exchange behaviour. + +Test classes ending in 'RuleTests' are derived from rules in amqp.xml. +""" + +import Queue, logging, traceback +from qpid.testlib import TestBase010 +from qpid.datatypes import Message +from qpid.client import Closed +from qpid.session import SessionException + + +class TestHelper(TestBase010): + def setUp(self): + TestBase010.setUp(self) + self.queues = [] + self.exchanges = [] + self.subscriptions = [] + + def tearDown(self): + try: + for s in self.subscriptions: + self.session.message_cancel(destination=s) + for ssn, q in self.queues: + ssn.queue_delete(queue=q) + for ssn, ex in self.exchanges: + ssn.exchange_delete(exchange=ex) + except: + print "Error on tearDown:" + print traceback.print_exc() + TestBase010.tearDown(self) + + def createMessage(self, key="", body=""): + return Message(self.session.delivery_properties(routing_key=key), body) + + def getApplicationHeaders(self, msg): + for h in msg.headers: + if hasattr(h, 'application_headers'): return getattr(h, 'application_headers') + return None + + def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): + """ + Publish to exchange and assert queue.get() returns the same message. + """ + body = self.uniqueString() + dp=self.session.delivery_properties(routing_key=routing_key) + mp=self.session.message_properties(application_headers=properties) + self.session.message_transfer(destination=exchange, message=Message(dp, mp, body)) + msg = queue.get(timeout=1) + self.assertEqual(body, msg.body) + if (properties): + self.assertEqual(properties, self.getApplicationHeaders(msg)) + + def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): + """ + Publish a message and consume it, assert it comes back intact. + Return the Queue object used to consume. + """ + self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) + + def assertEmpty(self, queue): + """Assert that the queue is empty""" + try: + queue.get(timeout=1) + self.fail("Queue is not empty.") + except Queue.Empty: None # Ignore + + def queue_declare(self, session=None, *args, **keys): + session = session or self.session + reply = session.queue_declare(*args, **keys) + self.queues.append((session, keys["queue"])) + return reply + + def exchange_declare(self, session=None, ticket=0, exchange='', + type='', passive=False, durable=False, + auto_delete=False, + arguments={}): + session = session or self.session + reply = session.exchange_declare(exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments) + self.exchanges.append((session,exchange)) + return reply + + def uniqueString(self): + """Generate a unique string, unique for this TestBase instance""" + if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; + return "Test Message " + str(self.uniqueCounter) + + def consume(self, queueName): + """Consume from named queue returns the Queue object.""" + if not "uniqueTag" in dir(self): self.uniqueTag = 1 + else: self.uniqueTag += 1 + consumer_tag = "tag" + str(self.uniqueTag) + self.session.message_subscribe(queue=queueName, destination=consumer_tag) + self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFFL) + self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) + self.subscriptions.append(consumer_tag) + return self.session.incoming(consumer_tag) + + +class StandardExchangeVerifier: + """Verifies standard exchange behavior. + + Used as base class for classes that test standard exchanges.""" + + def verifyDirectExchange(self, ex): + """Verify that ex behaves like a direct exchange.""" + self.queue_declare(queue="q") + self.session.exchange_bind(queue="q", exchange=ex, binding_key="k") + self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") + try: + self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") + self.fail("Expected Empty exception") + except Queue.Empty: None # Expected + + def verifyFanOutExchange(self, ex): + """Verify that ex behaves like a fanout exchange.""" + self.queue_declare(queue="q") + self.session.exchange_bind(queue="q", exchange=ex) + self.queue_declare(queue="p") + self.session.exchange_bind(queue="p", exchange=ex) + for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) + + def verifyTopicExchange(self, ex): + """Verify that ex behaves like a topic exchange""" + self.queue_declare(queue="a") + self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*") + q = self.consume("a") + self.assertPublishGet(q, ex, "a.b.x") + self.assertPublishGet(q, ex, "a.x.b.x") + self.assertPublishGet(q, ex, "a.x.x.b.x") + # Shouldn't match + self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) + self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y")) + self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x")) + self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) + self.assert_(q.empty()) + + def verifyHeadersExchange(self, ex): + """Verify that ex is a headers exchange""" + self.queue_declare(queue="q") + self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) + q = self.consume("q") + headers = {"name":"fred", "age":3} + self.assertPublishGet(q, exchange=ex, properties=headers) + self.session.message_transfer(destination=ex) # No headers, won't deliver + self.assertEmpty(q); + + +class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier): + """ + The server SHOULD implement these standard exchange types: topic, headers. + + Client attempts to declare an exchange with each of these standard types. + """ + + def testDirect(self): + """Declare and test a direct exchange""" + self.exchange_declare(0, exchange="d", type="direct") + self.verifyDirectExchange("d") + + def testFanout(self): + """Declare and test a fanout exchange""" + self.exchange_declare(0, exchange="f", type="fanout") + self.verifyFanOutExchange("f") + + def testTopic(self): + """Declare and test a topic exchange""" + self.exchange_declare(0, exchange="t", type="topic") + self.verifyTopicExchange("t") + + def testHeaders(self): + """Declare and test a headers exchange""" + self.exchange_declare(0, exchange="h", type="headers") + self.verifyHeadersExchange("h") + + +class RequiredInstancesRuleTests(TestHelper, StandardExchangeVerifier): + """ + The server MUST, in each virtual host, pre-declare an exchange instance + for each standard exchange type that it implements, where the name of the + exchange instance is amq. followed by the exchange type name. + + Client creates a temporary queue and attempts to bind to each required + exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if + those types are defined). + """ + def testAmqDirect(self): self.verifyDirectExchange("amq.direct") + + def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") + + def testAmqTopic(self): self.verifyTopicExchange("amq.topic") + + def testAmqMatch(self): self.verifyHeadersExchange("amq.match") + +class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier): + """ + The server MUST predeclare a direct exchange to act as the default exchange + for content Publish methods and for default queue bindings. + + Client checks that the default exchange is active by specifying a queue + binding with no exchange name, and publishing a message with a suitable + routing key but without specifying the exchange name, then ensuring that + the message arrives in the queue correctly. + """ + def testDefaultExchange(self): + # Test automatic binding by queue name. + self.queue_declare(queue="d") + self.assertPublishConsume(queue="d", routing_key="d") + # Test explicit bind to default queue + self.verifyDirectExchange("") + + +# TODO aconway 2006-09-27: Fill in empty tests: + +class DefaultAccessRuleTests(TestHelper): + """ + The server MUST NOT allow clients to access the default exchange except + by specifying an empty exchange name in the Queue.Bind and content Publish + methods. + """ + +class ExtensionsRuleTests(TestHelper): + """ + The server MAY implement other exchange types as wanted. + """ + + +class DeclareMethodMinimumRuleTests(TestHelper): + """ + The server SHOULD support a minimum of 16 exchanges per virtual host and + ideally, impose no limit except as defined by available resources. + + The client creates as many exchanges as it can until the server reports + an error; the number of exchanges successfuly created must be at least + sixteen. + """ + + +class DeclareMethodTicketFieldValidityRuleTests(TestHelper): + """ + The client MUST provide a valid access ticket giving "active" access to + the realm in which the exchange exists or will be created, or "passive" + access if the if-exists flag is set. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeclareMethodExchangeFieldReservedRuleTests(TestHelper): + """ + Exchange names starting with "amq." are reserved for predeclared and + standardised exchanges. The client MUST NOT attempt to create an exchange + starting with "amq.". + + Similarly, exchanges starting with "qpid." are reserved for Qpid + implementation-specific system exchanges (such as the management exchange). + The client must not attempt to create an exchange starting with the string + "qpid.". + """ + def template(self, reservedString, exchangeType): + try: + self.session.exchange_declare(exchange=reservedString, type=exchangeType) + self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".") + except SessionException, e: + self.assertEquals(e.args[0].error_code, 530) + # connection closed, reopen it + self.tearDown() + self.setUp() + try: + self.session.exchange_declare(exchange=reservedString + "abc123", type=exchangeType) + self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".") + except SessionException, e: + self.assertEquals(e.args[0].error_code, 530) + # connection closed, reopen it + self.tearDown() + self.setUp() + # The following should be legal: + self.session.exchange_declare(exchange=reservedString[:-1], type=exchangeType) + self.session.exchange_delete(exchange=reservedString[:-1]) + self.session.exchange_declare(exchange=reservedString[1:], type=exchangeType) + self.session.exchange_delete(exchange=reservedString[1:]) + self.session.exchange_declare(exchange="." + reservedString, type=exchangeType) + self.session.exchange_delete(exchange="." + reservedString) + self.session.exchange_declare(exchange="abc." + reservedString, type=exchangeType) + self.session.exchange_delete(exchange="abc." + reservedString) + self.session.exchange_declare(exchange="abc." + reservedString + "def", type=exchangeType) + self.session.exchange_delete(exchange="abc." + reservedString + "def") + + def test_amq(self): + self.template("amq.", "direct") + self.template("amq.", "topic") + self.template("amq.", "fanout") + + def test_qpid(self): + self.template("qpid.", "direct") + self.template("qpid.", "topic") + self.template("qpid.", "fanout") + + +class DeclareMethodTypeFieldTypedRuleTests(TestHelper): + """ + Exchanges cannot be redeclared with different types. The client MUST not + attempt to redeclare an existing exchange with a different type than used + in the original Exchange.Declare method. + + + """ + + +class DeclareMethodTypeFieldSupportRuleTests(TestHelper): + """ + The client MUST NOT attempt to create an exchange with a type that the + server does not support. + + + """ + + +class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper): + """ + If set, and the exchange does not already exist, the server MUST raise a + channel exception with reply code 404 (not found). + """ + def test(self): + try: + self.session.exchange_declare(exchange="humpty_dumpty", passive=True) + self.fail("Expected 404 for passive declaration of unknown exchange.") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + +class DeclareMethodDurableFieldSupportRuleTests(TestHelper): + """ + The server MUST support both durable and transient exchanges. + + + """ + + +class DeclareMethodDurableFieldStickyRuleTests(TestHelper): + """ + The server MUST ignore the durable field if the exchange already exists. + + + """ + + +class DeclareMethodAutoDeleteFieldStickyRuleTests(TestHelper): + """ + The server MUST ignore the auto-delete field if the exchange already + exists. + + + """ + + +class DeleteMethodTicketFieldValidityRuleTests(TestHelper): + """ + The client MUST provide a valid access ticket giving "active" access + rights to the exchange's access realm. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeleteMethodExchangeFieldExistsRuleTests(TestHelper): + """ + The client MUST NOT attempt to delete an exchange that does not exist. + """ + + +class HeadersExchangeTests(TestHelper): + """ + Tests for headers exchange functionality. + """ + def setUp(self): + TestHelper.setUp(self) + self.queue_declare(queue="q") + self.q = self.consume("q") + + def myAssertPublishGet(self, headers): + self.assertPublishGet(self.q, exchange="amq.match", properties=headers) + + def myBasicPublish(self, headers): + mp=self.session.message_properties(application_headers=headers) + self.session.message_transfer(destination="amq.match", message=Message(mp, "foobar")) + + def testMatchAll(self): + self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) + + # None of these should match + self.myBasicPublish({}) + self.myBasicPublish({"name":"barney"}) + self.myBasicPublish({"name":10}) + self.myBasicPublish({"name":"fred", "age":2}) + self.assertEmpty(self.q) + + def testMatchAny(self): + self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred"}) + self.myAssertPublishGet({"name":"fred", "ignoreme":10}) + self.myAssertPublishGet({"ignoreme":10, "age":3}) + + # Wont match + self.myBasicPublish({}) + self.myBasicPublish({"irrelevant":0}) + self.assertEmpty(self.q) + + +class MiscellaneousErrorsTests(TestHelper): + """ + Test some miscellaneous error conditions + """ + def testTypeNotKnown(self): + try: + self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") + self.fail("Expected 503 for declaration of unknown exchange type.") + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) + + def testDifferentDeclaredType(self): + self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") + try: + session = self.conn.session("alternate", 2) + session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") + self.fail("Expected 530 for redeclaration of exchange with different type.") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + +class ExchangeTests(TestHelper): + def testHeadersBindNoMatchArg(self): + self.session.queue_declare(queue="q", exclusive=True, auto_delete=True) + try: + self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} ) + self.fail("Expected failure for missing x-match arg.") + except SessionException, e: + self.assertEquals(541, e.args[0].error_code) diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py new file mode 100644 index 0000000000..677645fa2c --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py @@ -0,0 +1,467 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.datatypes import Message, RangedSet +from qpid.testlib import TestBase010 +from qpid.management import managementChannel, managementClient +from threading import Condition +from time import sleep +import qmf.console + +class ManagementTest (TestBase010): + """ + Tests for the management hooks + """ + + def test_broker_connectivity_oldAPI (self): + """ + Call the "echo" method on the broker to verify it is alive and talking. + """ + session = self.session + + mc = managementClient () + mch = mc.addChannel (session) + + mc.syncWaitForStable (mch) + brokers = mc.syncGetObjects (mch, "broker") + self.assertEqual (len (brokers), 1) + broker = brokers[0] + args = {} + body = "Echo Message Body" + args["body"] = body + + for seq in range (1, 5): + args["sequence"] = seq + res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args) + self.assertEqual (res.status, 0) + self.assertEqual (res.statusText, "OK") + self.assertEqual (res.sequence, seq) + self.assertEqual (res.body, body) + mc.removeChannel (mch) + + def test_methods_sync (self): + """ + Call the "echo" method on the broker to verify it is alive and talking. + """ + session = self.session + self.startQmf() + + brokers = self.qmf.getObjects(_class="broker") + self.assertEqual(len(brokers), 1) + broker = brokers[0] + + body = "Echo Message Body" + for seq in range(1, 20): + res = broker.echo(seq, body) + self.assertEqual(res.status, 0) + self.assertEqual(res.text, "OK") + self.assertEqual(res.sequence, seq) + self.assertEqual(res.body, body) + + def test_get_objects(self): + self.startQmf() + + # get the package list, verify that the qpid broker package is there + packages = self.qmf.getPackages() + assert 'org.apache.qpid.broker' in packages + + # get the schema class keys for the broker, verify the broker table and link-down event + keys = self.qmf.getClasses('org.apache.qpid.broker') + broker = None + linkDown = None + for key in keys: + if key.getClassName() == "broker": broker = key + if key.getClassName() == "brokerLinkDown" : linkDown = key + assert broker + assert linkDown + + brokerObjs = self.qmf.getObjects(_class="broker") + assert len(brokerObjs) == 1 + brokerObjs = self.qmf.getObjects(_key=broker) + assert len(brokerObjs) == 1 + + def test_self_session_id (self): + self.startQmf() + sessionId = self.qmf_broker.getSessionId() + brokerSessions = self.qmf.getObjects(_class="session") + + found = False + for bs in brokerSessions: + if bs.name == sessionId: + found = True + self.assertEqual (found, True) + + def test_standard_exchanges (self): + self.startQmf() + + exchanges = self.qmf.getObjects(_class="exchange") + exchange = self.findExchange (exchanges, "") + self.assertEqual (exchange.type, "direct") + exchange = self.findExchange (exchanges, "amq.direct") + self.assertEqual (exchange.type, "direct") + exchange = self.findExchange (exchanges, "amq.topic") + self.assertEqual (exchange.type, "topic") + exchange = self.findExchange (exchanges, "amq.fanout") + self.assertEqual (exchange.type, "fanout") + exchange = self.findExchange (exchanges, "amq.match") + self.assertEqual (exchange.type, "headers") + exchange = self.findExchange (exchanges, "qpid.management") + self.assertEqual (exchange.type, "topic") + + def findExchange (self, exchanges, name): + for exchange in exchanges: + if exchange.name == name: + return exchange + return None + + def test_move_queued_messages(self): + """ + Test ability to move messages from the head of one queue to another. + Need to test moveing all and N messages. + """ + self.startQmf() + session = self.session + "Set up source queue" + session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key") + + twenty = range(1,21) + props = session.delivery_properties(routing_key="routing_key") + for count in twenty: + body = "Move Message %d" % count + src_msg = Message(props, body) + session.message_transfer(destination="amq.direct", message=src_msg) + + "Set up destination queue" + session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="dest-queue", exchange="amq.direct") + + queues = self.qmf.getObjects(_class="queue") + + "Move 10 messages from src-queue to dest-queue" + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + self.assertEqual (result.status, 0) + + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] + + self.assertEqual (sq.msgDepth,10) + self.assertEqual (dq.msgDepth,10) + + "Move all remaining messages to destination" + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + self.assertEqual (result.status,0) + + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] + + self.assertEqual (sq.msgDepth,0) + self.assertEqual (dq.msgDepth,20) + + "Use a bad source queue name" + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + self.assertEqual (result.status,4) + + "Use a bad destination queue name" + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + self.assertEqual (result.status,4) + + " Use a large qty (40) to move from dest-queue back to " + " src-queue- should move all " + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + self.assertEqual (result.status,0) + + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] + + self.assertEqual (sq.msgDepth,20) + self.assertEqual (dq.msgDepth,0) + + "Consume the messages of the queue and check they are all there in order" + session.message_subscribe(queue="src-queue", destination="tag") + session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("tag") + for count in twenty: + consumed_msg = queue.get(timeout=1) + body = "Move Message %d" % count + self.assertEqual(body, consumed_msg.body) + + def test_purge_queue(self): + """ + Test ability to purge messages from the head of a queue. + Need to test moveing all, 1 (top message) and N messages. + """ + self.startQmf() + session = self.session + "Set up purge queue" + session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True) + session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key") + + twenty = range(1,21) + props = session.delivery_properties(routing_key="routing_key") + for count in twenty: + body = "Purge Message %d" % count + msg = Message(props, body) + session.message_transfer(destination="amq.direct", message=msg) + + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] + + "Purge top message from purge-queue" + result = pq.purge(1) + self.assertEqual (result.status, 0) + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] + self.assertEqual (pq.msgDepth,19) + + "Purge top 9 messages from purge-queue" + result = pq.purge(9) + self.assertEqual (result.status, 0) + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] + self.assertEqual (pq.msgDepth,10) + + "Purge all messages from purge-queue" + result = pq.purge(0) + self.assertEqual (result.status, 0) + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] + self.assertEqual (pq.msgDepth,0) + + def test_reroute_queue(self): + """ + Test ability to reroute messages from the head of a queue. + Need to test moving all, 1 (top message) and N messages. + """ + self.startQmf() + session = self.session + "Set up test queue" + session.exchange_declare(exchange="alt.direct1", type="direct") + session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key") + session.exchange_declare(exchange="alt.direct2", type="direct") + session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key") + session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1") + session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key") + + twenty = range(1,21) + props = session.delivery_properties(routing_key="routing_key") + for count in twenty: + body = "Reroute Message %d" % count + msg = Message(props, body) + session.message_transfer(destination="amq.direct", message=msg) + + pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0] + + "Reroute top message from reroute-queue to alternate exchange" + result = pq.reroute(1, True, "") + self.assertEqual(result.status, 0) + pq.update() + aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0] + self.assertEqual(pq.msgDepth,19) + self.assertEqual(aq.msgDepth,1) + + "Reroute top 9 messages from reroute-queue to alt.direct2" + result = pq.reroute(9, False, "alt.direct2") + self.assertEqual(result.status, 0) + pq.update() + aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] + self.assertEqual(pq.msgDepth,10) + self.assertEqual(aq.msgDepth,9) + + "Reroute using a non-existent exchange" + result = pq.reroute(0, False, "amq.nosuchexchange") + self.assertEqual(result.status, 4) + + "Reroute all messages from reroute-queue" + result = pq.reroute(0, False, "alt.direct2") + self.assertEqual(result.status, 0) + pq.update() + aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] + self.assertEqual(pq.msgDepth,0) + self.assertEqual(aq.msgDepth,19) + + "Make more messages" + twenty = range(1,21) + props = session.delivery_properties(routing_key="routing_key") + for count in twenty: + body = "Reroute Message %d" % count + msg = Message(props, body) + session.message_transfer(destination="amq.direct", message=msg) + + "Reroute onto the same queue" + result = pq.reroute(0, False, "amq.direct") + self.assertEqual(result.status, 0) + pq.update() + self.assertEqual(pq.msgDepth,20) + + + def test_methods_async (self): + """ + """ + class Handler (qmf.console.Console): + def __init__(self): + self.cv = Condition() + self.xmtList = {} + self.rcvList = {} + + def methodResponse(self, broker, seq, response): + self.cv.acquire() + try: + self.rcvList[seq] = response + finally: + self.cv.release() + + def request(self, broker, count): + self.count = count + for idx in range(count): + self.cv.acquire() + try: + seq = broker.echo(idx, "Echo Message", _async = True) + self.xmtList[seq] = idx + finally: + self.cv.release() + + def check(self): + if self.count != len(self.xmtList): + return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList)) + lost = 0 + mismatched = 0 + for seq in self.xmtList: + value = self.xmtList[seq] + if seq in self.rcvList: + result = self.rcvList.pop(seq) + if result.sequence != value: + mismatched += 1 + else: + lost += 1 + spurious = len(self.rcvList) + if lost == 0 and mismatched == 0 and spurious == 0: + return "pass" + else: + return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious) + + handler = Handler() + self.startQmf(handler) + brokers = self.qmf.getObjects(_class="broker") + self.assertEqual(len(brokers), 1) + broker = brokers[0] + handler.request(broker, 20) + sleep(1) + self.assertEqual(handler.check(), "pass") + + def test_connection_close(self): + """ + Test management method for closing connection + """ + self.startQmf() + conn = self.connect() + session = conn.session("my-named-session") + + #using qmf find named session and close the corresponding connection: + qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0] + qmf_ssn_object._connectionRef_.close() + + #check that connection is closed + try: + conn.session("another-session") + self.fail("Expected failure from closed connection") + except: None + + #make sure that the named session has been closed and the name can be re-used + conn = self.connect() + session = conn.session("my-named-session") + session.queue_declare(queue="whatever", exclusive=True, auto_delete=True) + + def test_binding_count_on_queue(self): + self.startQmf() + conn = self.connect() + session = self.session + + QUEUE = "binding_test_queue" + EX_DIR = "binding_test_exchange_direct" + EX_FAN = "binding_test_exchange_fanout" + EX_TOPIC = "binding_test_exchange_topic" + EX_HDR = "binding_test_exchange_headers" + + # + # Create a test queue + # + session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True) + queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0] + if not queue: + self.fail("Queue not found") + self.assertEqual(queue.bindingCount, 1, "wrong initial binding count") + + # + # Create an exchange of each supported type + # + session.exchange_declare(exchange=EX_DIR, type="direct") + session.exchange_declare(exchange=EX_FAN, type="fanout") + session.exchange_declare(exchange=EX_TOPIC, type="topic") + session.exchange_declare(exchange=EX_HDR, type="headers") + + # + # Bind each exchange to the test queue + # + match = {} + match['x-match'] = "all" + match['key'] = "value" + session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1") + session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2") + session.exchange_bind(exchange=EX_FAN, queue=QUEUE) + session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#") + session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#") + session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match) + match['key2'] = "value2" + session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match) + + # + # Verify that the queue's binding count accounts for the new bindings + # + queue.update() + self.assertEqual(queue.bindingCount, 8, + "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount) + + # + # Remove some of the bindings + # + session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2") + session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#") + session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2") + + # + # Verify that the queue's binding count accounts for the deleted bindings + # + queue.update() + self.assertEqual(queue.bindingCount, 5, + "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount) + # + # Delete the exchanges + # + session.exchange_delete(exchange=EX_DIR) + session.exchange_delete(exchange=EX_FAN) + session.exchange_delete(exchange=EX_TOPIC) + session.exchange_delete(exchange=EX_HDR) + + # + # Verify that the queue's binding count accounts for the lost bindings + # + queue.update() + self.assertEqual(queue.bindingCount, 1, + "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py new file mode 100644 index 0000000000..e80333a1e6 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py @@ -0,0 +1,918 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.testlib import TestBase010 +from qpid.datatypes import Message, RangedSet +from qpid.session import SessionException + +from qpid.content import Content +from time import sleep + +class MessageTests(TestBase010): + """Tests for 'methods' on the amqp message 'class'""" + + def test_no_local(self): + """ + NOTE: this is a test of a QPID specific feature + + Test that the qpid specific no_local arg is honoured. + """ + session = self.session + #setup, declare two queues one of which excludes delivery of locally sent messages + session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) + #establish two consumers + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b") + + #send a message + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) + + #send a message from another session on the same connection to each queue + session2 = self.conn.session("my-local-session") + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) + + #send a message from a session on another connection to each queue + for q in ["test-queue-1a", "test-queue-1b"]: + session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") + other = self.connect() + session3 = other.session("my-other-session") + session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) + other.close() + + #check the queues of the two consumers + excluded = session.incoming("local_excluded") + included = session.incoming("local_included") + for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: + msg = included.get(timeout=1) + self.assertEqual(b, msg.body) + msg = excluded.get(timeout=1) + self.assertEqual("i-am-not-local", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + def test_no_local_awkward(self): + + """ + NOTE: this is a test of a QPID specific feature + + Check that messages which will be excluded through no-local + processing will not block subsequent deliveries + """ + + session = self.session + #setup: + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) + #establish consumer which excludes delivery of locally sent messages + self.subscribe(destination="local_excluded", queue="test-queue") + + #send a 'local' message + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local")) + + #send a non local message + other = self.connect() + session2 = other.session("my-session", 1) + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign")) + session2.close() + other.close() + + #check that the second message only is delivered + excluded = session.incoming("local_excluded") + msg = excluded.get(timeout=1) + self.assertEqual("foreign", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received extra message") + except Empty: None + #check queue is empty + self.assertEqual(0, session.queue_query(queue="test-queue").message_count) + + def test_no_local_exclusive_subscribe(self): + """ + NOTE: this is a test of a QPID specific feature + + Test that the no_local processing works on queues not declared + as exclusive, but with an exclusive subscription + """ + session = self.session + + #setup, declare two queues one of which excludes delivery of + #locally sent messages but is not declared as exclusive + session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'}) + #establish two consumers + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True) + + #send a message from the same session to each queue + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) + + #send a message from another session on the same connection to each queue + session2 = self.conn.session("my-session") + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) + session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) + + #send a message from a session on another connection to each queue + for q in ["test-queue-1a", "test-queue-1b"]: + session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") + other = self.connect() + session3 = other.session("my-other-session") + session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) + other.close() + + #check the queues of the two consumers + excluded = session.incoming("local_excluded") + included = session.incoming("local_included") + for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: + msg = included.get(timeout=1) + self.assertEqual(b, msg.body) + msg = excluded.get(timeout=1) + self.assertEqual("i-am-not-local", msg.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + + def test_consume_exclusive(self): + """ + Test an exclusive consumer prevents other consumer being created + """ + session = self.session + session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) + session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) + try: + session.message_subscribe(destination="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except SessionException, e: + self.assertEquals(405, e.args[0].error_code) + + def test_consume_exclusive2(self): + """ + Check that an exclusive consumer cannot be created if a consumer already exists: + """ + session = self.session + session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) + session.message_subscribe(destination="first", queue="test-queue-2") + try: + session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except SessionException, e: + self.assertEquals(405, e.args[0].error_code) + + def test_consume_queue_not_found(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + session = self.session + try: + #queue specified but doesn't exist: + session.message_subscribe(queue="invalid-queue", destination="a") + self.fail("Expected failure when consuming from non-existent queue") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_consume_queue_not_specified(self): + session = self.session + try: + #queue not specified and none previously declared for channel: + session.message_subscribe(destination="a") + self.fail("Expected failure when consuming from unspecified queue") + except SessionException, e: + self.assertEquals(531, e.args[0].error_code) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + session = self.session + #setup, declare a queue: + session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) + + #check that attempts to use duplicate tags are detected and prevented: + session.message_subscribe(destination="first", queue="test-queue-3") + try: + session.message_subscribe(destination="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except SessionException, e: + self.assertEquals(530, e.args[0].error_code) + + def test_cancel(self): + """ + Test compliance of the basic.cancel method + """ + session = self.session + #setup, declare a queue: + session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) + + session.message_subscribe(destination="my-consumer", queue="test-queue-4") + myqueue = session.incoming("my-consumer") + session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + + #should flush here + + #cancel should stop messages being delivered + session.message_cancel(destination="my-consumer") + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two")) + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.body) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be handled without error + session.message_cancel(destination="my-consumer") + session.message_cancel(destination="this-never-existed") + + + def test_ack(self): + """ + Test basic ack/recover behaviour + """ + session = self.conn.session("alternate-session", timeout=10) + session.queue_declare(queue="test-ack-queue", auto_delete=True) + + session.message_subscribe(queue = "test-ack-queue", destination = "consumer") + session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("consumer") + + delivery_properties = session.delivery_properties(routing_key="test-ack-queue") + for i in ["One", "Two", "Three", "Four", "Five"]: + session.message_transfer(message=Message(delivery_properties, i)) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.body) + self.assertEqual("Two", msg2.body) + self.assertEqual("Three", msg3.body) + self.assertEqual("Four", msg4.body) + self.assertEqual("Five", msg5.body) + + session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four + + #subscribe from second session here to ensure queue is not + #auto-deleted when alternate session closes (no need to ack on these): + self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) + + #now close the session, and see that the unacked messages are + #then redelivered to another subscriber: + session.close(timeout=10) + + session = self.session + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("checker") + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.body) + self.assertEqual("Five", msg5b.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + def test_reject(self): + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") + session.queue_declare(queue = "r", exclusive=True, auto_delete=True) + session.exchange_bind(queue = "r", exchange = "amq.fanout") + + session.message_subscribe(queue = "q", destination = "consumer") + session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah")) + msg = session.incoming("consumer").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + session.message_reject(RangedSet(msg.id)) + + session.message_subscribe(queue = "r", destination = "checker") + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + msg = session.incoming("checker").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + + def test_credit_flow_messages(self): + """ + Test basic credit based flow control with unit = message + """ + #declare an exclusive queue + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 0, destination = "c") + #send batch of messages to queue + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) + + #set message credit to finite amount (less than enough for all messages) + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") + #set infinite byte credit + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + #check that expected number were received + q = session.incoming("c") + for i in range(1, 6): + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(6, 11): + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c") + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + def test_credit_flow_bytes(self): + """ + Test basic credit based flow control with unit = bytes + """ + #declare an exclusive queue + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 0, destination = "c") + #send batch of messages to queue + for i in range(10): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) + + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 19 + + #set byte credit to finite amount (less than enough for all messages) + session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") + #set infinite message credit + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c") + #check that expected number were received + q = session.incoming("c") + for i in range(5): + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(5): + session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c") + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + + def test_window_flow_messages(self): + """ + Test basic window based flow control with unit = message + """ + #declare an exclusive queue + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") + #send batch of messages to queue + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) + + #set message credit to finite amount (less than enough for all messages) + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") + #set infinite byte credit + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + #check that expected number were received + q = session.incoming("c") + for i in range(1, 6): + msg = q.get(timeout = 1) + session.receiver._completed.add(msg.id)#TODO: this may be done automatically + self.assertDataEquals(session, msg, "Message %d" % i) + self.assertEmpty(q) + + #acknowledge messages and check more are received + #TODO: there may be a nicer way of doing this + session.channel.session_completed(session.receiver._completed) + + for i in range(6, 11): + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + + def test_window_flow_bytes(self): + """ + Test basic window based flow control with unit = bytes + """ + #declare an exclusive queue + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer (for now that defaults to infinite credit) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") + #send batch of messages to queue + for i in range(10): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) + + #each message is currently interpreted as requiring msg_size bytes of credit + msg_size = 19 + + #set byte credit to finite amount (less than enough for all messages) + session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") + #set infinite message credit + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c") + #check that expected number were received + q = session.incoming("c") + msgs = [] + for i in range(5): + msg = q.get(timeout = 1) + msgs.append(msg) + self.assertDataEquals(session, msg, "abcdefgh") + self.assertEmpty(q) + + #ack each message individually and check more are received + for i in range(5): + msg = msgs.pop() + #TODO: there may be a nicer way of doing this + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + def test_window_flush_ack_flow(self): + """ + Test basic window based flow control with unit = bytes + """ + #declare an exclusive queue + ssn = self.session + ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True) + #create consumer + ssn.message_subscribe(queue = "q", destination = "c", + accept_mode=ssn.accept_mode.explicit) + ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c") + + #send message A + ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A")) + + for unit in ssn.credit_unit.VALUES: + ssn.message_flow("c", unit, 0xFFFFFFFFL) + + q = ssn.incoming("c") + msgA = q.get(timeout=10) + + ssn.message_flush(destination="c") + + # XXX + ssn.receiver._completed.add(msgA.id) + ssn.channel.session_completed(ssn.receiver._completed) + ssn.message_accept(RangedSet(msgA.id)) + + for unit in ssn.credit_unit.VALUES: + ssn.message_flow("c", unit, 0xFFFFFFFFL) + + #send message B + ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B")) + + msgB = q.get(timeout=10) + + def test_subscribe_not_acquired(self): + """ + Test the not-acquired modes works as expected for a simple case + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 6): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) + + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "b") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") + + for i in range(6, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) + + #both subscribers should see all messages + qA = session.incoming("a") + qB = session.incoming("b") + for i in range(1, 11): + for q in [qA, qB]: + msg = q.get(timeout = 1) + self.assertEquals("Message %s" % i, msg.body) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + + #TODO: tidy up completion + session.channel.session_completed(session.receiver._completed) + #messages should still be on the queue: + self.assertEquals(10, session.queue_query(queue = "q").message_count) + + def test_acquire_with_no_accept_and_credit_flow(self): + """ + Test that messages recieved unacquired, with accept not + required in windowing mode can be acquired. + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) + + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1) + session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("acquire me", msg.body) + #message should still be on the queue: + self.assertEquals(1, session.queue_query(queue = "q").message_count) + + transfers = RangedSet(msg.id) + response = session.message_acquire(transfers) + #check that we get notification (i.e. message_acquired) + self.assert_(msg.id in response.transfers) + #message should have been removed from the queue: + self.assertEquals(0, session.queue_query(queue = "q").message_count) + + def test_acquire(self): + """ + Test explicit acquire function + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) + + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("acquire me", msg.body) + #message should still be on the queue: + self.assertEquals(1, session.queue_query(queue = "q").message_count) + + transfers = RangedSet(msg.id) + response = session.message_acquire(transfers) + #check that we get notification (i.e. message_acquired) + self.assert_(msg.id in response.transfers) + #message should have been removed from the queue: + self.assertEquals(0, session.queue_query(queue = "q").message_count) + session.message_accept(transfers) + + + def test_release(self): + """ + Test explicit release function + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me")) + + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("release me", msg.body) + session.message_cancel(destination = "a") + session.message_release(RangedSet(msg.id)) + + #message should not have been removed from the queue: + self.assertEquals(1, session.queue_query(queue = "q").message_count) + + def test_release_ordering(self): + """ + Test order of released messages is as expected + """ + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range (1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i))) + + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + queue = session.incoming("a") + first = queue.get(timeout = 1) + for i in range(2, 10): + msg = queue.get(timeout = 1) + self.assertEquals("released message %s" % (i), msg.body) + + last = queue.get(timeout = 1) + self.assertEmpty(queue) + released = RangedSet() + released.add(first.id, last.id) + session.message_release(released) + + #TODO: may want to clean this up... + session.receiver._completed.add(first.id, last.id) + session.channel.session_completed(session.receiver._completed) + + for i in range(1, 11): + self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body) + + def test_ranged_ack(self): + """ + Test acking of messages ranges + """ + session = self.conn.session("alternate-session", timeout=10) + + session.queue_declare(queue = "q", auto_delete=True) + delivery_properties = session.delivery_properties(routing_key="q") + for i in range (1, 11): + session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) + + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + queue = session.incoming("a") + ids = [] + for i in range (1, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message %s" % (i), msg.body) + ids.append(msg.id) + + self.assertEmpty(queue) + + #ack all but the fourth message (command id 2) + accepted = RangedSet() + accepted.add(ids[0], ids[2]) + accepted.add(ids[4], ids[9]) + session.message_accept(accepted) + + #subscribe from second session here to ensure queue is not + #auto-deleted when alternate session closes (no need to ack on these): + self.session.message_subscribe(queue = "q", destination = "checker") + + #now close the session, and see that the unacked messages are + #then redelivered to another subscriber: + session.close(timeout=10) + + session = self.session + session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("checker") + + self.assertEquals("message 4", queue.get(timeout = 1).body) + self.assertEmpty(queue) + + def test_subscribe_not_acquired_2(self): + session = self.session + + #publish some messages + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) + + #consume some of them + session.message_subscribe(queue = "q", destination = "a") + session.message_set_flow_mode(flow_mode = 0, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + + queue = session.incoming("a") + for i in range(1, 6): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + #complete and accept + session.message_accept(RangedSet(msg.id)) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + self.assertEmpty(queue) + + #now create a not-acquired subscriber + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") + + #check it gets those not consumed + queue = session.incoming("b") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") + for i in range(6, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + session.message_release(RangedSet(msg.id)) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, session.queue_query(queue="q").message_count) + + def test_subscribe_not_acquired_3(self): + session = self.session + + #publish some messages + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) + + #create a not-acquired subscriber + session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + + #browse through messages + queue = session.incoming("a") + for i in range(1, 11): + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + if (i % 2): + #try to acquire every second message + response = session.message_acquire(RangedSet(msg.id)) + #check that acquire succeeds + self.assert_(msg.id in response.transfers) + session.message_accept(RangedSet(msg.id)) + else: + session.message_release(RangedSet(msg.id)) + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + self.assertEmpty(queue) + + #create a second not-acquired subscriber + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") + #check it gets those not consumed + queue = session.incoming("b") + for i in [2,4,6,8,10]: + msg = queue.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + session.message_release(RangedSet(msg.id)) + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) + session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") + self.assertEmpty(queue) + + #check all 'browsed' messages are still on the queue + self.assertEqual(5, session.queue_query(queue="q").message_count) + + def test_release_unacquired(self): + session = self.session + + #create queue + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + #send message + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message")) + + #create two 'browsers' + session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + queueA = session.incoming("a") + + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b") + queueB = session.incoming("b") + + #have each browser release the message + msgA = queueA.get(timeout = 1) + session.message_release(RangedSet(msgA.id)) + + msgB = queueB.get(timeout = 1) + session.message_release(RangedSet(msgB.id)) + + #cancel browsers + session.message_cancel(destination = "a") + session.message_cancel(destination = "b") + + #create consumer + session.message_subscribe(queue = "q", destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c") + queueC = session.incoming("c") + #consume the message then ack it + msgC = queueC.get(timeout = 1) + session.message_accept(RangedSet(msgC.id)) + #ensure there are no other messages + self.assertEmpty(queueC) + + def test_release_order(self): + session = self.session + + #create queue + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + #send messages + for i in range(1, 11): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) + + #subscribe: + session.message_subscribe(queue="q", destination="a") + a = session.incoming("a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + + for i in range(1, 11): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + if (i % 2): + #accept all odd messages + session.message_accept(RangedSet(msg.id)) + else: + #release all even messages + session.message_release(RangedSet(msg.id)) + + #browse: + session.message_subscribe(queue="q", destination="b", acquire_mode=1) + b = session.incoming("b") + b.start() + for i in [2, 4, 6, 8, 10]: + msg = b.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + + + def test_empty_body(self): + session = self.session + session.queue_declare(queue="xyz", exclusive=True, auto_delete=True) + props = session.delivery_properties(routing_key="xyz") + session.message_transfer(message=Message(props, "")) + + consumer_tag = "tag1" + session.message_subscribe(queue="xyz", destination=consumer_tag) + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) + queue = session.incoming(consumer_tag) + msg = queue.get(timeout=1) + self.assertEquals("", msg.body) + session.message_accept(RangedSet(msg.id)) + + def test_incoming_start(self): + q = "test_incoming_start" + session = self.session + + session.queue_declare(queue=q, exclusive=True, auto_delete=True) + session.message_subscribe(queue=q, destination="msgs") + messages = session.incoming("msgs") + assert messages.destination == "msgs" + + dp = session.delivery_properties(routing_key=q) + session.message_transfer(message=Message(dp, "test")) + + messages.start() + msg = messages.get() + assert msg.body == "test" + + def test_ttl(self): + q = "test_ttl" + session = self.session + + session.queue_declare(queue=q, exclusive=True, auto_delete=True) + + dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second + session.message_transfer(message=Message(dp, "first")) + + dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes + session.message_transfer(message=Message(dp, "second")) + + d = "msgs" + session.message_subscribe(queue=q, destination=d) + messages = session.incoming(d) + sleep(1) + session.message_flow(unit = session.credit_unit.message, value=2, destination=d) + session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFFL, destination=d) + assert messages.get(timeout=1).body == "second" + self.assertEmpty(messages) + + + def assertDataEquals(self, session, msg, expected): + self.assertEquals(expected, msg.body) + + def assertEmpty(self, queue): + try: + extra = queue.get(timeout=1) + self.fail("Queue not empty, contains: " + extra.body) + except Empty: None + +class SizelessContent(Content): + + def size(self): + return None diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py b/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py new file mode 100644 index 0000000000..e9cf9b7caa --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.datatypes import Message, RangedSet +#from qpid.testlib import testrunner, TestBase010 +from qpid.testlib import TestBase010 + +class PersistenceTests(TestBase010): + def test_delete_queue_after_publish(self): + session = self.session + session.auto_sync = False + + #create queue + session.queue_declare(queue = "q", auto_delete=True, durable=True) + + #send message + for i in range(1, 10): + dp = session.delivery_properties(routing_key="q", delivery_mode=2) + session.message_transfer(message=Message(dp, "my-message")) + + session.auto_sync = True + #explicitly delete queue + session.queue_delete(queue = "q") + + def test_ack_message_from_deleted_queue(self): + session = self.session + session.auto_sync = False + + #create queue + session.queue_declare(queue = "q", auto_delete=True, durable=True) + + #send message + dp = session.delivery_properties(routing_key="q", delivery_mode=2) + session.message_transfer(message=Message(dp, "my-message")) + + #create consumer + session.message_subscribe(queue = "q", destination = "a", accept_mode = 1, acquire_mode=0) + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + queue = session.incoming("a") + + #consume the message, cancel subscription (triggering auto-delete), then ack it + msg = queue.get(timeout = 5) + session.message_cancel(destination = "a") + session.message_accept(RangedSet(msg.id)) + + def test_queue_deletion(self): + session = self.session + session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True) + session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz") + dp = session.delivery_properties(routing_key="xyz", delivery_mode=2) + session.message_transfer(destination="amq.topic", message=Message(dp, "my-message")) + session.queue_delete(queue = "durable-subscriber-queue") diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/query.py b/qpid/tests/src/py/qpid_tests/broker_0_10/query.py new file mode 100644 index 0000000000..d57e964982 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/query.py @@ -0,0 +1,247 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase010 + +class QueryTests(TestBase010): + """Tests for various query methods""" + + def test_queue_query(self): + session = self.session + session.queue_declare(queue="my-queue", exclusive=True) + result = session.queue_query(queue="my-queue") + self.assertEqual("my-queue", result.queue) + + def test_queue_query_unknown(self): + session = self.session + result = session.queue_query(queue="I don't exist") + self.assert_(not result.queue) + + def test_exchange_query(self): + """ + Test that the exchange_query method works as expected + """ + session = self.session + #check returned type for the standard exchanges + self.assertEqual("direct", session.exchange_query(name="amq.direct").type) + self.assertEqual("topic", session.exchange_query(name="amq.topic").type) + self.assertEqual("fanout", session.exchange_query(name="amq.fanout").type) + self.assertEqual("headers", session.exchange_query(name="amq.match").type) + self.assertEqual("direct", session.exchange_query(name="").type) + #declare an exchange + session.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) + #check that the result of a query is as expected + response = session.exchange_query(name="my-test-exchange") + self.assertEqual("direct", response.type) + self.assert_(not response.durable) + self.assert_(not response.not_found) + #delete the exchange + session.exchange_delete(exchange="my-test-exchange") + #check that the query now reports not-found + self.assert_(session.exchange_query(name="my-test-exchange").not_found) + + def test_exchange_bound_direct(self): + """ + Test that the exchange_bound method works as expected with the direct exchange + """ + self.exchange_bound_with_key("amq.direct") + + def test_exchange_bound_topic(self): + """ + Test that the exchange_bound method works as expected with the direct exchange + """ + self.exchange_bound_with_key("amq.topic") + + def exchange_bound_with_key(self, exchange_name): + session = self.session + #setup: create two queues + session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + + session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key") + + # test detection of any binding to specific queue + response = session.exchange_bound(exchange=exchange_name, queue="used-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + + # test detection of specific binding to any queue + response = session.exchange_bound(exchange=exchange_name, binding_key="used-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.key_not_matched) + + # test detection of specific binding to specific queue + response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + self.assert_(not response.key_not_matched) + + # test unmatched queue, unspecified binding + response = session.exchange_bound(exchange=exchange_name, queue="unused-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + # test unspecified queue, unmatched binding + response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.key_not_matched) + + # test matched queue, unmatched binding + response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + self.assertEqual(True, response.key_not_matched) + + # test unmatched queue, matched binding + response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assert_(not response.key_not_matched) + + # test unmatched queue, unmatched binding + response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(True, response.key_not_matched) + + #test exchange not found + self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) + + #test exchange found, queue not found + response = session.exchange_bound(exchange=exchange_name, queue="unknown-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(True, response.queue_not_found) + + #test exchange not found, queue found + response = session.exchange_bound(exchange="unknown-exchange", queue="used-queue") + self.assertEqual(True, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + + #test not exchange found, queue not found + response = session.exchange_bound(exchange="unknown-exchange", queue="unknown-queue") + self.assertEqual(True, response.exchange_not_found) + self.assertEqual(True, response.queue_not_found) + + + def test_exchange_bound_fanout(self): + """ + Test that the exchange_bound method works as expected with fanout exchange + """ + session = self.session + #setup + session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="amq.fanout", queue="used-queue") + + # test detection of any binding to specific queue + response = session.exchange_bound(exchange="amq.fanout", queue="used-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + + # test unmatched queue, unspecified binding + response = session.exchange_bound(exchange="amq.fanout", queue="unused-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + #test exchange not found + self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, session.exchange_bound(exchange="amq.fanout", queue="unknown-queue").queue_not_found) + + def test_exchange_bound_header(self): + """ + Test that the exchange_bound method works as expected with headers exchanges + """ + session = self.session + #setup + session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} ) + + # test detection of any binding to specific queue + response = session.exchange_bound(exchange="amq.match", queue="used-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + + # test detection of specific binding to any queue + response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "a":"A"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.args_not_matched) + + # test detection of specific binding to specific queue + response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + self.assert_(not response.args_not_matched) + + # test unmatched queue, unspecified binding + response = session.exchange_bound(exchange="amq.match", queue="unused-queue") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + # test unspecified queue, unmatched binding + response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "b":"B"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.args_not_matched) + + # test matched queue, unmatched binding + response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + self.assertEqual(True, response.args_not_matched) + + # test unmatched queue, matched binding + response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assert_(not response.args_not_matched) + + # test unmatched queue, unmatched binding + response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"}) + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(True, response.args_not_matched) + + #test exchange not found + self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, session.exchange_bound(exchange="amq.match", queue="unknown-queue").queue_not_found) + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py b/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py new file mode 100644 index 0000000000..eb38965190 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py @@ -0,0 +1,366 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.testlib import TestBase010 +from qpid.datatypes import Message +from qpid.session import SessionException + +class QueueTests(TestBase010): + """Tests for 'methods' on the amqp queue 'class'""" + + def test_purge(self): + """ + Test that the purge method removes messages from the queue + """ + session = self.session + #setup, declare a queue and add some messages to it: + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three")) + + #check that the queue now reports 3 messages: + session.queue_declare(queue="test-queue") + reply = session.queue_query(queue="test-queue") + self.assertEqual(3, reply.message_count) + + #now do the purge, then test that three messages are purged and the count drops to 0 + session.queue_purge(queue="test-queue"); + reply = session.queue_query(queue="test-queue") + self.assertEqual(0, reply.message_count) + + #send a further message and consume it, ensuring that the other messages are really gone + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four")) + session.message_subscribe(queue="test-queue", destination="tag") + session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("tag") + msg = queue.get(timeout=1) + self.assertEqual("four", msg.body) + + def test_purge_queue_exists(self): + """ + Test that the correct exception is thrown is no queue exists + for the name specified in purge + """ + session = self.session + try: + #queue specified but doesn't exist: + session.queue_purge(queue="invalid-queue") + self.fail("Expected failure when purging non-existent queue") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) #not-found + + def test_purge_empty_name(self): + """ + Test that the correct exception is thrown is no queue name + is specified for purge + """ + session = self.session + try: + #queue not specified and none previously declared for channel: + session.queue_purge() + self.fail("Expected failure when purging unspecified queue") + except SessionException, e: + self.assertEquals(531, e.args[0].error_code) #illegal-argument + + def test_declare_exclusive(self): + """ + Test that the exclusive field is honoured in queue.declare + """ + # TestBase.setUp has already opened session(1) + s1 = self.session + # Here we open a second separate connection: + s2 = self.conn.session("other") + + #declare an exclusive queue: + s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) + try: + #other connection should not be allowed to declare this: + s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) + self.fail("Expected second exclusive queue_declare to raise a channel exception") + except SessionException, e: + self.assertEquals(405, e.args[0].error_code) + + s3 = self.conn.session("subscriber") + try: + #other connection should not be allowed to declare this: + s3.message_subscribe(queue="exclusive-queue") + self.fail("Expected message_subscribe on an exclusive queue to raise a channel exception") + except SessionException, e: + self.assertEquals(405, e.args[0].error_code) + + s4 = self.conn.session("deleter") + try: + #other connection should not be allowed to declare this: + s4.queue_delete(queue="exclusive-queue") + self.fail("Expected queue_delete on an exclusive queue to raise a channel exception") + except SessionException, e: + self.assertEquals(405, e.args[0].error_code) + + + def test_declare_passive(self): + """ + Test that the passive field is honoured in queue.declare + """ + session = self.session + #declare an exclusive queue: + session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) + session.queue_declare(queue="passive-queue-1", passive=True) + try: + #other connection should not be allowed to declare this: + session.queue_declare(queue="passive-queue-2", passive=True) + self.fail("Expected passive declaration of non-existant queue to raise a channel exception") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) #not-found + + + def test_bind(self): + """ + Test various permutations of the queue.bind method + """ + session = self.session + session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + + #straightforward case, both exchange & queue exist so no errors expected: + session.exchange_bind(queue="queue-1", exchange="amq.direct", binding_key="key1") + + #use the queue name where the routing key is not specified: + session.exchange_bind(queue="queue-1", exchange="amq.direct") + + #try and bind to non-existant exchange + try: + session.exchange_bind(queue="queue-1", exchange="an-invalid-exchange", binding_key="key1") + self.fail("Expected bind to non-existant exchange to fail") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + + def test_bind_queue_existence(self): + session = self.session + #try and bind non-existant queue: + try: + session.exchange_bind(queue="queue-2", exchange="amq.direct", binding_key="key1") + self.fail("Expected bind of non-existant queue to fail") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_unbind_direct(self): + self.unbind_test(exchange="amq.direct", routing_key="key") + + def test_unbind_topic(self): + self.unbind_test(exchange="amq.topic", routing_key="key") + + def test_unbind_fanout(self): + self.unbind_test(exchange="amq.fanout") + + def test_unbind_headers(self): + self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"}) + + def unbind_test(self, exchange, routing_key="", args=None, headers=None): + #bind two queues and consume from them + session = self.session + + session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) + + session.message_subscribe(queue="queue-1", destination="queue-1") + session.message_flow(destination="queue-1", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="queue-1", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + session.message_subscribe(queue="queue-2", destination="queue-2") + session.message_flow(destination="queue-2", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="queue-2", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + + queue1 = session.incoming("queue-1") + queue2 = session.incoming("queue-2") + + session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args) + session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args) + + dp = session.delivery_properties(routing_key=routing_key) + if (headers): + mp = session.message_properties(application_headers=headers) + msg1 = Message(dp, mp, "one") + msg2 = Message(dp, mp, "two") + else: + msg1 = Message(dp, "one") + msg2 = Message(dp, "two") + + #send a message that will match both bindings + session.message_transfer(destination=exchange, message=msg1) + + #unbind first queue + session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key) + + #send another message + session.message_transfer(destination=exchange, message=msg2) + + #check one queue has both messages and the other has only one + self.assertEquals("one", queue1.get(timeout=1).body) + try: + msg = queue1.get(timeout=1) + self.fail("Got extra message: %s" % msg.body) + except Empty: pass + + self.assertEquals("one", queue2.get(timeout=1).body) + self.assertEquals("two", queue2.get(timeout=1).body) + try: + msg = queue2.get(timeout=1) + self.fail("Got extra message: " + msg) + except Empty: pass + + + def test_delete_simple(self): + """ + Test core queue deletion behaviour + """ + session = self.session + + #straight-forward case: + session.queue_declare(queue="delete-me") + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c")) + session.queue_delete(queue="delete-me") + #check that it has gone by declaring passively + try: + session.queue_declare(queue="delete-me", passive=True) + self.fail("Queue has not been deleted") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_delete_queue_exists(self): + """ + Test core queue deletion behaviour + """ + #check attempted deletion of non-existant queue is handled correctly: + session = self.session + try: + session.queue_delete(queue="i-dont-exist", if_empty=True) + self.fail("Expected delete of non-existant queue to fail") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + + + def test_delete_ifempty(self): + """ + Test that if_empty field of queue_delete is honoured + """ + session = self.session + + #create a queue and add a message to it (use default binding): + session.queue_declare(queue="delete-me-2") + session.queue_declare(queue="delete-me-2", passive=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message")) + + #try to delete, but only if empty: + try: + session.queue_delete(queue="delete-me-2", if_empty=True) + self.fail("Expected delete if_empty to fail for non-empty queue") + except SessionException, e: + self.assertEquals(406, e.args[0].error_code) + + #need new session now: + session = self.conn.session("replacement", 2) + + #empty queue: + session.message_subscribe(destination="consumer_tag", queue="delete-me-2") + session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) + queue = session.incoming("consumer_tag") + msg = queue.get(timeout=1) + self.assertEqual("message", msg.body) + session.message_cancel(destination="consumer_tag") + + #retry deletion on empty queue: + session.queue_delete(queue="delete-me-2", if_empty=True) + + #check that it has gone by declaring passively: + try: + session.queue_declare(queue="delete-me-2", passive=True) + self.fail("Queue has not been deleted") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + def test_delete_ifunused(self): + """ + Test that if_unused field of queue_delete is honoured + """ + session = self.session + + #create a queue and register a consumer: + session.queue_declare(queue="delete-me-3") + session.queue_declare(queue="delete-me-3", passive=True) + session.message_subscribe(destination="consumer_tag", queue="delete-me-3") + + #need new session now: + session2 = self.conn.session("replacement", 2) + + #try to delete, but only if empty: + try: + session2.queue_delete(queue="delete-me-3", if_unused=True) + self.fail("Expected delete if_unused to fail for queue with existing consumer") + except SessionException, e: + self.assertEquals(406, e.args[0].error_code) + + session.message_cancel(destination="consumer_tag") + session.queue_delete(queue="delete-me-3", if_unused=True) + #check that it has gone by declaring passively: + try: + session.queue_declare(queue="delete-me-3", passive=True) + self.fail("Queue has not been deleted") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + + def test_autodelete_shared(self): + """ + Test auto-deletion (of non-exclusive queues) + """ + session = self.session + session2 =self.conn.session("other", 1) + + session.queue_declare(queue="auto-delete-me", auto_delete=True) + + #consume from both sessions + tag = "my-tag" + session.message_subscribe(queue="auto-delete-me", destination=tag) + session2.message_subscribe(queue="auto-delete-me", destination=tag) + + #implicit cancel + session2.close() + + #check it is still there + session.queue_declare(queue="auto-delete-me", passive=True) + + #explicit cancel => queue is now unused again: + session.message_cancel(destination=tag) + + #NOTE: this assumes there is no timeout in use + + #check that it has gone by declaring it passively + try: + session.queue_declare(queue="auto-delete-me", passive=True) + self.fail("Expected queue to have been deleted") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py b/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py new file mode 100644 index 0000000000..8cdc539a08 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py @@ -0,0 +1,265 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.datatypes import Message, RangedSet +from qpid.testlib import TestBase010 + +class TxTests(TestBase010): + """ + Tests for 'methods' on the amqp tx 'class' + """ + + def test_commit(self): + """ + Test that commited publishes are delivered and commited acks are not re-delivered + """ + session = self.session + + #declare queues and create subscribers in the checking session + #to ensure that the queues are not auto-deleted too early: + self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"]) + session.message_subscribe(queue="tx-commit-a", destination="qa") + session.message_subscribe(queue="tx-commit-b", destination="qb") + session.message_subscribe(queue="tx-commit-c", destination="qc") + + #use a separate session for actual work + session2 = self.conn.session("worker", 2) + self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + session2.tx_commit() + session2.close() + + session.tx_select() + + self.enable_flow("qa") + queue_a = session.incoming("qa") + + self.enable_flow("qb") + queue_b = session.incoming("qb") + + self.enable_flow("qc") + queue_c = session.incoming("qc") + + #check results + for i in range(1, 5): + msg = queue_c.get(timeout=1) + self.assertEqual("TxMessage %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_b.get(timeout=1) + self.assertEqual("TxMessage 6", msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_a.get(timeout=1) + self.assertEqual("TxMessage 7", msg.body) + session.message_accept(RangedSet(msg.id)) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + session.tx_commit() + + def test_auto_rollback(self): + """ + Test that a session closed with an open transaction is effectively rolled back + """ + session = self.session + self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"]) + session.message_subscribe(queue="tx-autorollback-a", destination="qa") + session.message_subscribe(queue="tx-autorollback-b", destination="qb") + session.message_subscribe(queue="tx-autorollback-c", destination="qc") + + session2 = self.conn.session("worker", 2) + queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + session2.close() + + session.tx_select() + + self.enable_flow("qa") + queue_a = session.incoming("qa") + + self.enable_flow("qb") + queue_b = session.incoming("qb") + + self.enable_flow("qc") + queue_c = session.incoming("qc") + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + session.message_accept(RangedSet(msg.id)) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + session.tx_commit() + + def test_rollback(self): + """ + Test that rolled back publishes are not delivered and rolled back acks are re-delivered + """ + session = self.session + queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + session.tx_rollback() + + #need to release messages to get them redelivered now: + session.message_release(consumed) + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + session.message_accept(RangedSet(msg.id)) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + session.message_accept(RangedSet(msg.id)) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + #cleanup + session.tx_commit() + + def perform_txn_work(self, session, name_a, name_b, name_c): + """ + Utility method that does some setup and some work under a transaction. Used for testing both + commit and rollback + """ + #setup: + self.declare_queues([name_a, name_b, name_c]) + + key = "my_key_" + name_b + topic = "my_topic_" + name_c + + session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key) + session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic) + + dp = session.delivery_properties(routing_key=name_a) + for i in range(1, 5): + mp = session.message_properties(message_id="msg%d" % i) + session.message_transfer(message=Message(dp, mp, "Message %d" % i)) + + dp = session.delivery_properties(routing_key=key) + mp = session.message_properties(message_id="msg6") + session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6")) + + dp = session.delivery_properties(routing_key=topic) + mp = session.message_properties(message_id="msg7") + session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7")) + + session.tx_select() + + #consume and ack messages + acked = RangedSet() + self.subscribe(session, queue=name_a, destination="sub_a") + queue_a = session.incoming("sub_a") + for i in range(1, 5): + msg = queue_a.get(timeout=1) + acked.add(msg.id) + self.assertEqual("Message %d" % i, msg.body) + + self.subscribe(session, queue=name_b, destination="sub_b") + queue_b = session.incoming("sub_b") + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.body) + acked.add(msg.id) + + sub_c = self.subscribe(session, queue=name_c, destination="sub_c") + queue_c = session.incoming("sub_c") + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.body) + acked.add(msg.id) + + session.message_accept(acked) + + dp = session.delivery_properties(routing_key=topic) + #publish messages + for i in range(1, 5): + mp = session.message_properties(message_id="tx-msg%d" % i) + session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i)) + + dp = session.delivery_properties(routing_key=key) + mp = session.message_properties(message_id="tx-msg6") + session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6")) + + dp = session.delivery_properties(routing_key=name_a) + mp = session.message_properties(message_id="tx-msg7") + session.message_transfer(message=Message(dp, mp, "TxMessage 7")) + return queue_a, queue_b, queue_c, acked + + def declare_queues(self, names, session=None): + session = session or self.session + for n in names: + session.queue_declare(queue=n, auto_delete=True) + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination=consumer_tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) + + def enable_flow(self, tag, session=None): + session = session or self.session + session.message_flow(destination=tag, unit=session.credit_unit.message, value=0xFFFFFFFFL) + session.message_flow(destination=tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) + + def complete(self, session, msg): + session.receiver._completed.add(msg.id)#TODO: this may be done automatically + session.channel.session_completed(session.receiver._completed) + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py new file mode 100644 index 0000000000..526f2452f8 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py @@ -0,0 +1,22 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import basic, broker, example, exchange, queue, testlib, tx diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py b/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py new file mode 100644 index 0000000000..d5837fc19c --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py @@ -0,0 +1,396 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase + +class BasicTests(TestBase): + """Tests for 'methods' on the amqp basic 'class'""" + + def test_consume_no_local(self): + """ + Test that the no_local flag is honoured in the consume method + """ + channel = self.channel + #setup, declare two queues: + channel.queue_declare(queue="test-queue-1a", exclusive=True) + channel.queue_declare(queue="test-queue-1b", exclusive=True) + #establish two consumers one of which excludes delivery of locally sent messages + channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") + channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) + + #send a message + channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) + channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) + + #check the queues of the two consumers + excluded = self.client.queue("local_excluded") + included = self.client.queue("local_included") + msg = included.get(timeout=1) + self.assertEqual("consume_no_local", msg.content.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + + def test_consume_exclusive(self): + """ + Test that the exclusive flag is honoured in the consume method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-2", exclusive=True) + + #check that an exclusive consumer prevents other consumer being created: + channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) + try: + channel.basic_consume(consumer_tag="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + #open new channel and cleanup last consumer: + channel = self.client.channel(2) + channel.channel_open() + + #check that an exclusive consumer cannot be created if a consumer already exists: + channel.basic_consume(consumer_tag="first", queue="test-queue-2") + try: + channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + def test_consume_queue_errors(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + channel = self.channel + try: + #queue specified but doesn't exist: + channel.basic_consume(queue="invalid-queue") + self.fail("Expected failure when consuming from non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(2) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.basic_consume(queue="") + self.fail("Expected failure when consuming from unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-3", exclusive=True) + + #check that attempts to use duplicate tags are detected and prevented: + channel.basic_consume(consumer_tag="first", queue="test-queue-3") + try: + channel.basic_consume(consumer_tag="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_cancel(self): + """ + Test compliance of the basic.cancel method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-4", exclusive=True) + channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") + channel.basic_publish(routing_key="test-queue-4", content=Content("One")) + + myqueue = self.client.queue("my-consumer") + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.content.body) + + #cancel should stop messages being delivered + channel.basic_cancel(consumer_tag="my-consumer") + channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be handled without error + channel.basic_cancel(consumer_tag="my-consumer") + channel.basic_cancel(consumer_tag="this-never-existed") + + + def test_ack(self): + """ + Test basic ack/recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="test-ack-queue", exclusive=True) + + reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) + queue = self.client.queue(reply.consumer_tag) + + channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two + channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four + + channel.basic_recover(requeue=False) + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + def test_recover_requeue(self): + """ + Test requeing on recovery + """ + channel = self.channel + channel.queue_declare(queue="test-requeue", exclusive=True) + + subscription = channel.basic_consume(queue="test-requeue", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + channel.basic_publish(routing_key="test-requeue", content=Content("One")) + channel.basic_publish(routing_key="test-requeue", content=Content("Two")) + channel.basic_publish(routing_key="test-requeue", content=Content("Three")) + channel.basic_publish(routing_key="test-requeue", content=Content("Four")) + channel.basic_publish(routing_key="test-requeue", content=Content("Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two + channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four + + channel.basic_cancel(consumer_tag=subscription.consumer_tag) + + channel.basic_recover(requeue=True) + + subscription2 = channel.basic_consume(queue="test-requeue") + queue2 = self.client.queue(subscription2.consumer_tag) + + msg3b = queue2.get(timeout=1) + msg5b = queue2.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + self.assertEqual(True, msg3b.redelivered) + self.assertEqual(True, msg5b.redelivered) + + try: + extra = queue2.get(timeout=1) + self.fail("Got unexpected message in second queue: " + extra.content.body) + except Empty: None + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in original queue: " + extra.content.body) + except Empty: None + + + def test_qos_prefetch_count(self): + """ + Test that the prefetch count specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-count", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 5: + channel.basic_qos(prefetch_count=5) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered: + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + + + def test_qos_prefetch_size(self): + """ + Test that the prefetch size specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-size", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 50 bytes (each message is 9 or 10 bytes): + channel.basic_qos(prefetch_size=50) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered (i.e. 45 bytes worth): + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + #make sure that a single oversized message still gets delivered + large = "abcdefghijklmnopqrstuvwxyz" + large = large + "-" + large; + channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) + msg = queue.get(timeout=1) + self.assertEqual(large, msg.content.body) + + def test_get(self): + """ + Test basic_get method + """ + channel = self.channel + channel.queue_declare(queue="test-get", exclusive=True) + + #publish some messages (no_ack=True) + for i in range(1, 11): + channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + + #use basic_get to read back the messages, and check that we get an empty at the end + for i in range(1, 11): + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_empty") + + #repeat for no_ack=False + for i in range(11, 21): + channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + + for i in range(11, 21): + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_ok") + self.assertEqual("Message %d" % i, reply.content.body) + if(i == 13): + channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) + if(i in [15, 17, 19]): + channel.basic_ack(delivery_tag=reply.delivery_tag) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_empty") + + #recover(requeue=True) + channel.basic_recover(requeue=True) + + #get the unacked messages again (14, 16, 18, 20) + for i in [14, 16, 18, 20]: + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_ok") + self.assertEqual("Message %d" % i, reply.content.body) + channel.basic_ack(delivery_tag=reply.delivery_tag) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_empty") + + channel.basic_recover(requeue=True) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get_empty") diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py b/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py new file mode 100644 index 0000000000..7f3fe7530e --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase + +class BrokerTests(TestBase): + """Tests for basic Broker functionality""" + + def test_ack_and_no_ack(self): + """ + First, this test tries to receive a message with a no-ack + consumer. Second, this test tries to explicitly receive and + acknowledge a message with an acknowledging consumer. + """ + ch = self.channel + self.queue_declare(ch, queue = "myqueue") + + # No ack consumer + ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag + body = "test no-ack" + ch.basic_publish(routing_key = "myqueue", content = Content(body)) + msg = self.client.queue(ctag).get(timeout = 5) + self.assert_(msg.content.body == body) + + # Acknowledging consumer + self.queue_declare(ch, queue = "otherqueue") + ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag + body = "test ack" + ch.basic_publish(routing_key = "otherqueue", content = Content(body)) + msg = self.client.queue(ctag).get(timeout = 5) + ch.basic_ack(delivery_tag = msg.delivery_tag) + self.assert_(msg.content.body == body) + + def test_basic_delivery_immediate(self): + """ + Test basic message delivery where consume is issued before publish + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + + body = "Immediate Delivery" + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) + msg = queue.get(timeout=5) + self.assert_(msg.content.body == body) + + # TODO: Ensure we fail if immediate=True and there's no consumer. + + + def test_basic_delivery_queued(self): + """ + Test basic message delivery where publish is issued before consume + (i.e. requires queueing of the message) + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + body = "Queued Delivery" + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=5) + self.assert_(msg.content.body == body) + + def test_invalid_channel(self): + channel = self.client.channel(200) + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for invalid channel") + except Closed, e: + self.assertConnectionException(504, e.args[0]) + + def test_closed_channel(self): + channel = self.client.channel(200) + channel.channel_open() + channel.channel_close() + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for closed channel") + except Closed, e: + self.assertConnectionException(504, e.args[0]) + + def test_channel_flow(self): + channel = self.channel + channel.queue_declare(queue="flow_test_queue", exclusive=True) + ctag = channel.basic_consume(queue="flow_test_queue", no_ack=True).consumer_tag + incoming = self.client.queue(ctag) + + channel.channel_flow(active=False) + channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz")) + try: + incoming.get(timeout=1) + self.fail("Received message when flow turned off.") + except Empty: None + + channel.channel_flow(active=True) + msg = incoming.get(timeout=1) + self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/example.py b/qpid/tests/src/py/qpid_tests/broker_0_8/example.py new file mode 100644 index 0000000000..d82bad1f61 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/example.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.content import Content +from qpid.testlib import TestBase + +class ExampleTest (TestBase): + """ + An example Qpid test, illustrating the unittest frameowkr and the + python Qpid client. The test class must inherit TestCase. The + test code uses the Qpid client to interact with a qpid broker and + verify it behaves as expected. + """ + + def test_example(self): + """ + An example test. Note that test functions must start with 'test_' + to be recognized by the test framework. + """ + + # By inheriting TestBase, self.client is automatically connected + # and self.channel is automatically opened as channel(1) + # Other channel methods mimic the protocol. + channel = self.channel + + # Now we can send regular commands. If you want to see what the method + # arguments mean or what other commands are available, you can use the + # python builtin help() method. For example: + #help(chan) + #help(chan.exchange_declare) + + # If you want browse the available protocol methods without being + # connected to a live server you can use the amqp-doc utility: + # + # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] + # + # Options: + # -e, --regexp use regex instead of glob when matching + + # Now that we know what commands are available we can use them to + # interact with the server. + + # Here we use ordinal arguments. + self.exchange_declare(channel, 0, "test", "direct") + + # Here we use keyword arguments. + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") + + # Call Channel.basic_consume to register as a consumer. + # All the protocol methods return a message object. The message object + # has fields corresponding to the reply method fields, plus a content + # field that is filled if the reply includes content. In this case the + # interesting field is the consumer_tag. + reply = channel.basic_consume(queue="test-queue") + + # We can use the Client.queue(...) method to access the queue + # corresponding to our consumer_tag. + queue = self.client.queue(reply.consumer_tag) + + # Now lets publish a message and see if our consumer gets it. To do + # this we need to import the Content class. + body = "Hello World!" + channel.basic_publish(exchange="test", + routing_key="key", + content=Content(body)) + + # Now we'll wait for the message to arrive. We can use the timeout + # argument in case the server hangs. By default queue.get() will wait + # until a message arrives or the connection to the server dies. + msg = queue.get(timeout=10) + + # And check that we got the right response with assertEqual + self.assertEqual(body, msg.content.body) + + # Now acknowledge the message. + channel.basic_ack(msg.delivery_tag, True) + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py new file mode 100644 index 0000000000..56d6fa82e4 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py @@ -0,0 +1,327 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Tests for exchange behaviour. + +Test classes ending in 'RuleTests' are derived from rules in amqp.xml. +""" + +import Queue, logging +from qpid.testlib import TestBase +from qpid.content import Content +from qpid.client import Closed + + +class StandardExchangeVerifier: + """Verifies standard exchange behavior. + + Used as base class for classes that test standard exchanges.""" + + def verifyDirectExchange(self, ex): + """Verify that ex behaves like a direct exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") + self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") + try: + self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") + self.fail("Expected Empty exception") + except Queue.Empty: None # Expected + + def verifyFanOutExchange(self, ex): + """Verify that ex behaves like a fanout exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex) + self.queue_declare(queue="p") + self.channel.queue_bind(queue="p", exchange=ex) + for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) + + def verifyTopicExchange(self, ex): + """Verify that ex behaves like a topic exchange""" + self.queue_declare(queue="a") + self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") + q = self.consume("a") + self.assertPublishGet(q, ex, "a.b.x") + self.assertPublishGet(q, ex, "a.x.b.x") + self.assertPublishGet(q, ex, "a.x.x.b.x") + # Shouldn't match + self.channel.basic_publish(exchange=ex, routing_key="a.b") + self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") + self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x") + self.channel.basic_publish(exchange=ex, routing_key="a.b") + self.assert_(q.empty()) + + def verifyHeadersExchange(self, ex): + """Verify that ex is a headers exchange""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) + q = self.consume("q") + headers = {"name":"fred", "age":3} + self.assertPublishGet(q, exchange=ex, properties={'headers':headers}) + self.channel.basic_publish(exchange=ex) # No headers, won't deliver + self.assertEmpty(q); + + +class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server SHOULD implement these standard exchange types: topic, headers. + + Client attempts to declare an exchange with each of these standard types. + """ + + def testDirect(self): + """Declare and test a direct exchange""" + self.exchange_declare(0, exchange="d", type="direct") + self.verifyDirectExchange("d") + + def testFanout(self): + """Declare and test a fanout exchange""" + self.exchange_declare(0, exchange="f", type="fanout") + self.verifyFanOutExchange("f") + + def testTopic(self): + """Declare and test a topic exchange""" + self.exchange_declare(0, exchange="t", type="topic") + self.verifyTopicExchange("t") + + def testHeaders(self): + """Declare and test a headers exchange""" + self.exchange_declare(0, exchange="h", type="headers") + self.verifyHeadersExchange("h") + + +class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST, in each virtual host, pre-declare an exchange instance + for each standard exchange type that it implements, where the name of the + exchange instance is amq. followed by the exchange type name. + + Client creates a temporary queue and attempts to bind to each required + exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if + those types are defined). + """ + def testAmqDirect(self): self.verifyDirectExchange("amq.direct") + + def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") + + def testAmqTopic(self): self.verifyTopicExchange("amq.topic") + + def testAmqMatch(self): self.verifyHeadersExchange("amq.match") + +class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST predeclare a direct exchange to act as the default exchange + for content Publish methods and for default queue bindings. + + Client checks that the default exchange is active by specifying a queue + binding with no exchange name, and publishing a message with a suitable + routing key but without specifying the exchange name, then ensuring that + the message arrives in the queue correctly. + """ + def testDefaultExchange(self): + # Test automatic binding by queue name. + self.queue_declare(queue="d") + self.assertPublishConsume(queue="d", routing_key="d") + # Test explicit bind to default queue + self.verifyDirectExchange("") + + +# TODO aconway 2006-09-27: Fill in empty tests: + +class DefaultAccessRuleTests(TestBase): + """ + The server MUST NOT allow clients to access the default exchange except + by specifying an empty exchange name in the Queue.Bind and content Publish + methods. + """ + +class ExtensionsRuleTests(TestBase): + """ + The server MAY implement other exchange types as wanted. + """ + + +class DeclareMethodMinimumRuleTests(TestBase): + """ + The server SHOULD support a minimum of 16 exchanges per virtual host and + ideally, impose no limit except as defined by available resources. + + The client creates as many exchanges as it can until the server reports + an error; the number of exchanges successfuly created must be at least + sixteen. + """ + + +class DeclareMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access to + the realm in which the exchange exists or will be created, or "passive" + access if the if-exists flag is set. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeclareMethodExchangeFieldReservedRuleTests(TestBase): + """ + Exchange names starting with "amq." are reserved for predeclared and + standardised exchanges. The client MUST NOT attempt to create an exchange + starting with "amq.". + + + """ + + +class DeclareMethodTypeFieldTypedRuleTests(TestBase): + """ + Exchanges cannot be redeclared with different types. The client MUST not + attempt to redeclare an existing exchange with a different type than used + in the original Exchange.Declare method. + + + """ + + +class DeclareMethodTypeFieldSupportRuleTests(TestBase): + """ + The client MUST NOT attempt to create an exchange with a type that the + server does not support. + + + """ + + +class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): + """ + If set, and the exchange does not already exist, the server MUST raise a + channel exception with reply code 404 (not found). + """ + def test(self): + try: + self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) + self.fail("Expected 404 for passive declaration of unknown exchange.") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + +class DeclareMethodDurableFieldSupportRuleTests(TestBase): + """ + The server MUST support both durable and transient exchanges. + + + """ + + +class DeclareMethodDurableFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the durable field if the exchange already exists. + + + """ + + +class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the auto-delete field if the exchange already + exists. + + + """ + + +class DeleteMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access + rights to the exchange's access realm. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeleteMethodExchangeFieldExistsRuleTests(TestBase): + """ + The client MUST NOT attempt to delete an exchange that does not exist. + """ + + +class HeadersExchangeTests(TestBase): + """ + Tests for headers exchange functionality. + """ + def setUp(self): + TestBase.setUp(self) + self.queue_declare(queue="q") + self.q = self.consume("q") + + def myAssertPublishGet(self, headers): + self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers}) + + def myBasicPublish(self, headers): + self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers})) + + def testMatchAll(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) + + # None of these should match + self.myBasicPublish({}) + self.myBasicPublish({"name":"barney"}) + self.myBasicPublish({"name":10}) + self.myBasicPublish({"name":"fred", "age":2}) + self.assertEmpty(self.q) + + def testMatchAny(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred"}) + self.myAssertPublishGet({"name":"fred", "ignoreme":10}) + self.myAssertPublishGet({"ignoreme":10, "age":3}) + + # Wont match + self.myBasicPublish({}) + self.myBasicPublish({"irrelevant":0}) + self.assertEmpty(self.q) + + +class MiscellaneousErrorsTests(TestBase): + """ + Test some miscellaneous error conditions + """ + def testTypeNotKnown(self): + try: + self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") + self.fail("Expected 503 for declaration of unknown exchange type.") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def testDifferentDeclaredType(self): + self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") + try: + self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") + self.fail("Expected 530 for redeclaration of exchange with different type.") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + #cleanup + other = self.connect() + c2 = other.channel(1) + c2.channel_open() + c2.exchange_delete(exchange="test_different_declared_type_exchange") + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py b/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py new file mode 100644 index 0000000000..b7a41736ab --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py @@ -0,0 +1,255 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase + +class QueueTests(TestBase): + """Tests for 'methods' on the amqp queue 'class'""" + + def test_purge(self): + """ + Test that the purge method removes messages from the queue + """ + channel = self.channel + #setup, declare a queue and add some messages to it: + channel.exchange_declare(exchange="test-exchange", type="direct") + channel.queue_declare(queue="test-queue", exclusive=True) + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one")) + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two")) + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three")) + + #check that the queue now reports 3 messages: + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(3, reply.message_count) + + #now do the purge, then test that three messages are purged and the count drops to 0 + reply = channel.queue_purge(queue="test-queue"); + self.assertEqual(3, reply.message_count) + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(0, reply.message_count) + + #send a further message and consume it, ensuring that the other messages are really gone + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four")) + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=1) + self.assertEqual("four", msg.content.body) + + #check error conditions (use new channels): + channel = self.client.channel(2) + channel.channel_open() + try: + #queue specified but doesn't exist: + channel.queue_purge(queue="invalid-queue") + self.fail("Expected failure when purging non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(3) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.queue_purge() + self.fail("Expected failure when purging unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.exchange_delete(exchange="test-exchange") + + def test_declare_exclusive(self): + """ + Test that the exclusive field is honoured in queue.declare + """ + # TestBase.setUp has already opened channel(1) + c1 = self.channel + # Here we open a second separate connection: + other = self.connect() + c2 = other.channel(1) + c2.channel_open() + + #declare an exclusive queue: + c1.queue_declare(queue="exclusive-queue", exclusive="True") + try: + #other connection should not be allowed to declare this: + c2.queue_declare(queue="exclusive-queue", exclusive="True") + self.fail("Expected second exclusive queue_declare to raise a channel exception") + except Closed, e: + self.assertChannelException(405, e.args[0]) + + + def test_declare_passive(self): + """ + Test that the passive field is honoured in queue.declare + """ + channel = self.channel + #declare an exclusive queue: + channel.queue_declare(queue="passive-queue-1", exclusive="True") + channel.queue_declare(queue="passive-queue-1", passive="True") + try: + #other connection should not be allowed to declare this: + channel.queue_declare(queue="passive-queue-2", passive="True") + self.fail("Expected passive declaration of non-existant queue to raise a channel exception") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_bind(self): + """ + Test various permutations of the queue.bind method + """ + channel = self.channel + channel.queue_declare(queue="queue-1", exclusive="True") + + #straightforward case, both exchange & queue exist so no errors expected: + channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + + #bind the default queue for the channel (i.e. last one declared): + channel.queue_bind(exchange="amq.direct", routing_key="key2") + + #use the queue name where neither routing key nor queue are specified: + channel.queue_bind(exchange="amq.direct") + + #try and bind to non-existant exchange + try: + channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + self.fail("Expected bind to non-existant exchange to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #need to reopen a channel: + channel = self.client.channel(2) + channel.channel_open() + + #try and bind non-existant queue: + try: + channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + self.fail("Expected bind of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_delete_simple(self): + """ + Test basic queue deletion + """ + channel = self.channel + + #straight-forward case: + channel.queue_declare(queue="delete-me") + channel.basic_publish(routing_key="delete-me", content=Content("a")) + channel.basic_publish(routing_key="delete-me", content=Content("b")) + channel.basic_publish(routing_key="delete-me", content=Content("c")) + reply = channel.queue_delete(queue="delete-me") + self.assertEqual(3, reply.message_count) + #check that it has gone be declaring passively + try: + channel.queue_declare(queue="delete-me", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #check attempted deletion of non-existant queue is handled correctly: + channel = self.client.channel(2) + channel.channel_open() + try: + channel.queue_delete(queue="i-dont-exist", if_empty="True") + self.fail("Expected delete of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + + def test_delete_ifempty(self): + """ + Test that if_empty field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and add a message to it (use default binding): + channel.queue_declare(queue="delete-me-2") + channel.queue_declare(queue="delete-me-2", passive="True") + channel.basic_publish(routing_key="delete-me-2", content=Content("message")) + + #try to delete, but only if empty: + try: + channel.queue_delete(queue="delete-me-2", if_empty="True") + self.fail("Expected delete if_empty to fail for non-empty queue") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + #need new channel now: + channel = self.client.channel(2) + channel.channel_open() + + #empty queue: + reply = channel.basic_consume(queue="delete-me-2", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=1) + self.assertEqual("message", msg.content.body) + channel.basic_cancel(consumer_tag=reply.consumer_tag) + + #retry deletion on empty queue: + channel.queue_delete(queue="delete-me-2", if_empty="True") + + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-2", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + def test_delete_ifunused(self): + """ + Test that if_unused field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and register a consumer: + channel.queue_declare(queue="delete-me-3") + channel.queue_declare(queue="delete-me-3", passive="True") + reply = channel.basic_consume(queue="delete-me-3", no_ack=True) + + #need new channel now: + channel2 = self.client.channel(2) + channel2.channel_open() + #try to delete, but only if empty: + try: + channel2.queue_delete(queue="delete-me-3", if_unused="True") + self.fail("Expected delete if_unused to fail for queue with existing consumer") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + + channel.basic_cancel(consumer_tag=reply.consumer_tag) + channel.queue_delete(queue="delete-me-3", if_unused="True") + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-3", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py b/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py new file mode 100644 index 0000000000..76f7e964a2 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Tests for the testlib itself. +# + +from qpid.content import Content +from qpid.testlib import TestBase +from Queue import Empty + +import sys +from traceback import * + +def mytrace(frame, event, arg): + print_stack(frame); + print "====" + return mytrace + +class TestBaseTest(TestBase): + """Verify TestBase functions work as expected""" + + def testAssertEmptyPass(self): + """Test assert empty works""" + self.queue_declare(queue="empty") + q = self.consume("empty") + self.assertEmpty(q) + try: + q.get(timeout=1) + self.fail("Queue is not empty.") + except Empty: None # Ignore + + def testAssertEmptyFail(self): + self.queue_declare(queue="full") + q = self.consume("full") + self.channel.basic_publish(routing_key="full") + try: + self.assertEmpty(q); + self.fail("assertEmpty did not assert on non-empty queue") + except AssertionError: None # Ignore + + def testMessageProperties(self): + """Verify properties are passed with message""" + props={"headers":{"x":1, "y":2}} + self.queue_declare(queue="q") + q = self.consume("q") + self.assertPublishGet(q, routing_key="q", properties=props) + + + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py b/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py new file mode 100644 index 0000000000..9faddb1110 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase + +class TxTests(TestBase): + """ + Tests for 'methods' on the amqp tx 'class' + """ + + def test_commit(self): + """ + Test that commited publishes are delivered and commited acks are not re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel.tx_commit() + + #check results + for i in range(1, 5): + msg = queue_c.get(timeout=1) + self.assertEqual("TxMessage %d" % i, msg.content.body) + + msg = queue_b.get(timeout=1) + self.assertEqual("TxMessage 6", msg.content.body) + + msg = queue_a.get(timeout=1) + self.assertEqual("TxMessage 7", msg.content.body) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.basic_ack(delivery_tag=0, multiple=True) + channel.tx_commit() + + def test_auto_rollback(self): + """ + Test that a channel closed with an open transaction is effectively rolled back + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + channel.tx_rollback() + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.basic_ack(delivery_tag=0, multiple=True) + channel.tx_commit() + + def test_rollback(self): + """ + Test that rolled back publishes are not delivered and rolled back acks are re-delivered + """ + channel = self.channel + queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + channel.tx_rollback() + + #check results + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + + for q in [queue_a, queue_b, queue_c]: + try: + extra = q.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + #cleanup + channel.basic_ack(delivery_tag=0, multiple=True) + channel.tx_commit() + + def perform_txn_work(self, channel, name_a, name_b, name_c): + """ + Utility method that does some setup and some work under a transaction. Used for testing both + commit and rollback + """ + #setup: + channel.queue_declare(queue=name_a, exclusive=True) + channel.queue_declare(queue=name_b, exclusive=True) + channel.queue_declare(queue=name_c, exclusive=True) + + key = "my_key_" + name_b + topic = "my_topic_" + name_c + + channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) + channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) + + for i in range(1, 5): + channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i)) + + channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6")) + channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7")) + + channel.tx_select() + + #consume and ack messages + sub_a = channel.basic_consume(queue=name_a, no_ack=False) + queue_a = self.client.queue(sub_a.consumer_tag) + for i in range(1, 5): + msg = queue_a.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + sub_b = channel.basic_consume(queue=name_b, no_ack=False) + queue_b = self.client.queue(sub_b.consumer_tag) + msg = queue_b.get(timeout=1) + self.assertEqual("Message 6", msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag) + + sub_c = channel.basic_consume(queue=name_c, no_ack=False) + queue_c = self.client.queue(sub_c.consumer_tag) + msg = queue_c.get(timeout=1) + self.assertEqual("Message 7", msg.content.body) + channel.basic_ack(delivery_tag=msg.delivery_tag) + + #publish messages + for i in range(1, 5): + channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i)) + + channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6")) + channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7")) + + return queue_a, queue_b, queue_c + + def test_commit_overlapping_acks(self): + """ + Test that logically 'overlapping' acks do not cause errors on commit + """ + channel = self.channel + channel.queue_declare(queue="commit-overlapping", exclusive=True) + for i in range(1, 10): + channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i)) + + + channel.tx_select() + + sub = channel.basic_consume(queue="commit-overlapping", no_ack=False) + queue = self.client.queue(sub.consumer_tag) + for i in range(1, 10): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + if i in [3, 6, 10]: + channel.basic_ack(delivery_tag=msg.delivery_tag) + + channel.tx_commit() + + #check all have been acked: + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py new file mode 100644 index 0000000000..d9f2ed7dbb --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py @@ -0,0 +1,22 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import query, queue diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/query.py b/qpid/tests/src/py/qpid_tests/broker_0_9/query.py new file mode 100644 index 0000000000..cb66d079e5 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_9/query.py @@ -0,0 +1,224 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase + +class QueryTests(TestBase): + """Tests for various query methods introduced in 0-10 and available in 0-9 for preview""" + + def test_exchange_query(self): + """ + Test that the exchange_query method works as expected + """ + channel = self.channel + #check returned type for the standard exchanges + self.assertEqual("direct", channel.exchange_query(name="amq.direct").type) + self.assertEqual("topic", channel.exchange_query(name="amq.topic").type) + self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type) + self.assertEqual("headers", channel.exchange_query(name="amq.match").type) + self.assertEqual("direct", channel.exchange_query(name="").type) + #declare an exchange + channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) + #check that the result of a query is as expected + response = channel.exchange_query(name="my-test-exchange") + self.assertEqual("direct", response.type) + self.assertEqual(False, response.durable) + self.assertEqual(False, response.not_found) + #delete the exchange + channel.exchange_delete(exchange="my-test-exchange") + #check that the query now reports not-found + self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found) + + def test_binding_query_direct(self): + """ + Test that the binding_query method works as expected with the direct exchange + """ + self.binding_query_with_key("amq.direct") + + def test_binding_query_topic(self): + """ + Test that the binding_query method works as expected with the direct exchange + """ + self.binding_query_with_key("amq.topic") + + def binding_query_with_key(self, exchange_name): + channel = self.channel + #setup: create two queues + channel.queue_declare(queue="used-queue", exclusive=True) + channel.queue_declare(queue="unused-queue", exclusive=True) + + channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key") + + # test detection of any binding to specific queue + response = channel.binding_query(exchange=exchange_name, queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + + # test detection of specific binding to any queue + response = channel.binding_query(exchange=exchange_name, routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.key_not_matched) + + # test detection of specific binding to specific queue + response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(False, response.key_not_matched) + + # test unmatched queue, unspecified binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + # test unspecified queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.key_not_matched) + + # test matched queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(True, response.key_not_matched) + + # test unmatched queue, matched binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(False, response.key_not_matched) + + # test unmatched queue, unmatched binding + response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(True, response.key_not_matched) + + #test exchange not found + self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found) + + + def test_binding_query_fanout(self): + """ + Test that the binding_query method works as expected with fanout exchange + """ + channel = self.channel + #setup + channel.queue_declare(queue="used-queue", exclusive=True) + channel.queue_declare(queue="unused-queue", exclusive=True) + channel.queue_bind(exchange="amq.fanout", queue="used-queue") + + # test detection of any binding to specific queue + response = channel.binding_query(exchange="amq.fanout", queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + + # test unmatched queue, unspecified binding + response = channel.binding_query(exchange="amq.fanout", queue="unused-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + #test exchange not found + self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found) + + def test_binding_query_header(self): + """ + Test that the binding_query method works as expected with headers exchanges + """ + channel = self.channel + #setup + channel.queue_declare(queue="used-queue", exclusive=True) + channel.queue_declare(queue="unused-queue", exclusive=True) + channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} ) + + # test detection of any binding to specific queue + response = channel.binding_query(exchange="amq.match", queue="used-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + + # test detection of specific binding to any queue + response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.args_not_matched) + + # test detection of specific binding to specific queue + response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(False, response.args_not_matched) + + # test unmatched queue, unspecified binding + response = channel.binding_query(exchange="amq.match", queue="unused-queue") + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + + # test unspecified queue, unmatched binding + response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.args_not_matched) + + # test matched queue, unmatched binding + response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(False, response.queue_not_matched) + self.assertEqual(True, response.args_not_matched) + + # test unmatched queue, matched binding + response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(False, response.args_not_matched) + + # test unmatched queue, unmatched binding + response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"}) + self.assertEqual(False, response.exchange_not_found) + self.assertEqual(False, response.queue_not_found) + self.assertEqual(True, response.queue_not_matched) + self.assertEqual(True, response.args_not_matched) + + #test exchange not found + self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) + + #test queue not found + self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found) + diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py b/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py new file mode 100644 index 0000000000..de1153307c --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase + +class QueueTests(TestBase): + """Tests for 'methods' on the amqp queue 'class'""" + + def test_unbind_direct(self): + self.unbind_test(exchange="amq.direct", routing_key="key") + + def test_unbind_topic(self): + self.unbind_test(exchange="amq.topic", routing_key="key") + + def test_unbind_fanout(self): + self.unbind_test(exchange="amq.fanout") + + def test_unbind_headers(self): + self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"}) + + def unbind_test(self, exchange, routing_key="", args=None, headers={}): + #bind two queues and consume from them + channel = self.channel + + channel.queue_declare(queue="queue-1", exclusive="True") + channel.queue_declare(queue="queue-2", exclusive="True") + + channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True) + channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True) + + queue1 = self.client.queue("queue-1") + queue2 = self.client.queue("queue-2") + + channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) + + #send a message that will match both bindings + channel.basic_publish(exchange=exchange, routing_key=routing_key, + content=Content("one", properties={"headers": headers})) + + #unbind first queue + channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + + #send another message + channel.basic_publish(exchange=exchange, routing_key=routing_key, + content=Content("two", properties={"headers": headers})) + + #check one queue has both messages and the other has only one + self.assertEquals("one", queue1.get(timeout=1).content.body) + try: + msg = queue1.get(timeout=1) + self.fail("Got extra message: %s" % msg.body) + except Empty: pass + + self.assertEquals("one", queue2.get(timeout=1).content.body) + self.assertEquals("two", queue2.get(timeout=1).content.body) + try: + msg = queue2.get(timeout=1) + self.fail("Got extra message: " + msg) + except Empty: pass + + def test_autodelete_shared(self): + """ + Test auto-deletion (of non-exclusive queues) + """ + channel = self.channel + other = self.connect() + channel2 = other.channel(1) + channel2.channel_open() + + channel.queue_declare(queue="auto-delete-me", auto_delete=True) + + #consume from both channels + reply = channel.basic_consume(queue="auto-delete-me", no_ack=True) + channel2.basic_consume(queue="auto-delete-me", no_ack=True) + + #implicit cancel + channel2.channel_close() + + #check it is still there + channel.queue_declare(queue="auto-delete-me", passive=True) + + #explicit cancel => queue is now unused again: + channel.basic_cancel(consumer_tag=reply.consumer_tag) + + #NOTE: this assumes there is no timeout in use + + #check that it has gone be declaring passively + try: + channel.queue_declare(queue="auto-delete-me", passive=True) + self.fail("Expected queue to have been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) |