diff options
Diffstat (limited to 'python')
25 files changed, 3 insertions, 5851 deletions
diff --git a/python/Makefile b/python/Makefile index 2c7f9b8de0..2f2cb784e5 100644 --- a/python/Makefile +++ b/python/Makefile @@ -31,7 +31,7 @@ else AMQP_SPEC_DIR=$(PWD)/$(DATA_DIR)/amqp endif -DIRS=qmf qpid mllib models examples tests tests_0-8 tests_0-9 tests_0-10 +DIRS=qmf qpid mllib models examples tests SRCS=$(shell find $(DIRS) -name "*.py") qpid_config.py BUILD=build TARGETS=$(SRCS:%.py=$(BUILD)/%.py) @@ -79,18 +79,6 @@ install: build install -pm 0644 $(BUILD)/tests/*.* $(PYTHON_LIB)/tests $(PYCC) $(PYTHON_LIB)/tests - install -d $(PYTHON_LIB)/tests_0-8 - install -pm 0644 $(BUILD)/tests_0-8/*.* $(PYTHON_LIB)/tests_0-8 - $(PYCC) $(PYTHON_LIB)/tests_0-8 - - install -d $(PYTHON_LIB)/tests_0-9 - install -pm 0644 $(BUILD)/tests_0-9/*.* $(PYTHON_LIB)/tests_0-9 - $(PYCC) $(PYTHON_LIB)/tests_0-9 - - install -d $(PYTHON_LIB)/tests_0-10 - install -pm 0644 $(BUILD)/tests_0-10/*.* $(PYTHON_LIB)/tests_0-10 - $(PYCC) $(PYTHON_LIB)/tests_0-10 - install -d $(EXEC_PREFIX) install -pm 0755 qpid-python-test commands/* $(EXEC_PREFIX) diff --git a/python/qpid-python-test b/python/qpid-python-test index b569020368..5aedcb68ea 100755 --- a/python/qpid-python-test +++ b/python/qpid-python-test @@ -107,7 +107,7 @@ if not includes: if opts.modules: includes.append("*") else: - includes.extend(["qpid.tests.*", "tests.*", "tests_0-10.*"]) + includes.extend(["qpid.tests.*", "tests.*"]) def is_ignored(path): for p in excludes: @@ -512,7 +512,7 @@ class Harness: modules = opts.modules if not modules: - modules.extend(["qpid.tests", "tests", "tests_0-8", "tests_0-9", "tests_0-10"]) + modules.extend(["qpid.tests", "tests"]) h = Harness() for name in modules: m = __import__(name, None, None, ["dummy"]) diff --git a/python/tests_0-10/__init__.py b/python/tests_0-10/__init__.py deleted file mode 100644 index f9315a6f90..0000000000 --- a/python/tests_0-10/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from alternate_exchange import * -from broker import * -from dtx import * -from example import * -from exchange import * -from management import * -from message import * -from query import * -from queue import * -from tx import * diff --git a/python/tests_0-10/alternate_exchange.py b/python/tests_0-10/alternate_exchange.py deleted file mode 100644 index 4d8617eb8e..0000000000 --- a/python/tests_0-10/alternate_exchange.py +++ /dev/null @@ -1,204 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import traceback -from qpid.queue import Empty -from qpid.datatypes import Message -from qpid.testlib import TestBase010 -from qpid.session import SessionException - -class AlternateExchangeTests(TestBase010): - """ - Tests for the new mechanism for message returns introduced in 0-10 - and available in 0-9 for preview - """ - - def test_unroutable(self): - """ - Test that unroutable messages are delivered to the alternate-exchange if specified - """ - session = self.session - #create an exchange with an alternate defined - session.exchange_declare(exchange="secondary", type="fanout") - session.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary") - - #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages - session.queue_declare(queue="returns", exclusive=True, auto_delete=True) - session.exchange_bind(queue="returns", exchange="secondary") - session.message_subscribe(destination="a", queue="returns") - session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - returned = session.incoming("a") - - #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages - session.queue_declare(queue="processed", exclusive=True, auto_delete=True) - session.exchange_bind(queue="processed", exchange="primary", binding_key="my-key") - session.message_subscribe(destination="b", queue="processed") - session.message_flow(destination="b", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="b", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - processed = session.incoming("b") - - #publish to the primary exchange - #...one message that makes it to the 'processed' queue: - dp=self.session.delivery_properties(routing_key="my-key") - session.message_transfer(destination="primary", message=Message(dp, "Good")) - #...and one that does not: - dp=self.session.delivery_properties(routing_key="unused-key") - session.message_transfer(destination="primary", message=Message(dp, "Bad")) - - #delete the exchanges - session.exchange_delete(exchange="primary") - session.exchange_delete(exchange="secondary") - - #verify behaviour - self.assertEqual("Good", processed.get(timeout=1).body) - self.assertEqual("Bad", returned.get(timeout=1).body) - self.assertEmpty(processed) - self.assertEmpty(returned) - - def test_queue_delete(self): - """ - Test that messages in a queue being deleted are delivered to the alternate-exchange if specified - """ - session = self.session - #set up a 'dead letter queue': - session.exchange_declare(exchange="dlq", type="fanout") - session.queue_declare(queue="deleted", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="dlq", queue="deleted") - session.message_subscribe(destination="dlq", queue="deleted") - session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - dlq = session.incoming("dlq") - - #create a queue using the dlq as its alternate exchange: - session.queue_declare(queue="delete-me", alternate_exchange="dlq") - #send it some messages: - dp=self.session.delivery_properties(routing_key="delete-me") - session.message_transfer(message=Message(dp, "One")) - session.message_transfer(message=Message(dp, "Two")) - session.message_transfer(message=Message(dp, "Three")) - #delete it: - session.queue_delete(queue="delete-me") - #delete the dlq exchange: - session.exchange_delete(exchange="dlq") - - #check the messages were delivered to the dlq: - self.assertEqual("One", dlq.get(timeout=1).body) - self.assertEqual("Two", dlq.get(timeout=1).body) - self.assertEqual("Three", dlq.get(timeout=1).body) - self.assertEmpty(dlq) - - def test_delete_while_used_by_queue(self): - """ - Ensure an exchange still in use as an alternate-exchange for a - queue can't be deleted - """ - session = self.session - session.exchange_declare(exchange="alternate", type="fanout") - - session2 = self.conn.session("alternate", 2) - session2.queue_declare(queue="q", alternate_exchange="alternate") - try: - session2.exchange_delete(exchange="alternate") - self.fail("Expected deletion of in-use alternate-exchange to fail") - except SessionException, e: - session = self.session - session.queue_delete(queue="q") - session.exchange_delete(exchange="alternate") - self.assertEquals(530, e.args[0].error_code) - - - def test_delete_while_used_by_exchange(self): - """ - Ensure an exchange still in use as an alternate-exchange for - another exchange can't be deleted - """ - session = self.session - session.exchange_declare(exchange="alternate", type="fanout") - - session = self.conn.session("alternate", 2) - session.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate") - try: - session.exchange_delete(exchange="alternate") - self.fail("Expected deletion of in-use alternate-exchange to fail") - except SessionException, e: - session = self.session - session.exchange_delete(exchange="e") - session.exchange_delete(exchange="alternate") - self.assertEquals(530, e.args[0].error_code) - - - def test_modify_existing_exchange_alternate(self): - """ - Ensure that attempting to modify an exhange to change - the alternate throws an exception - """ - session = self.session - session.exchange_declare(exchange="alt1", type="direct") - session.exchange_declare(exchange="alt2", type="direct") - session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1") - try: - # attempt to change the alternate on an already existing exchange - session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2") - self.fail("Expected changing an alternate on an existing exchange to fail") - except SessionException, e: - self.assertEquals(530, e.args[0].error_code) - session = self.conn.session("alternate", 2) - session.exchange_delete(exchange="onealternate") - session.exchange_delete(exchange="alt2") - session.exchange_delete(exchange="alt1") - - - def test_add_alternate_to_exchange(self): - """ - Ensure that attempting to modify an exhange by adding - an alternate throws an exception - """ - session = self.session - session.exchange_declare(exchange="alt1", type="direct") - session.exchange_declare(exchange="noalternate", type="fanout") - try: - # attempt to add an alternate on an already existing exchange - session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1") - self.fail("Expected adding an alternate on an existing exchange to fail") - except SessionException, e: - self.assertEquals(530, e.args[0].error_code) - session = self.conn.session("alternate", 2) - session.exchange_delete(exchange="noalternate") - session.exchange_delete(exchange="alt1") - - - def test_del_alternate_to_exchange(self): - """ - Ensure that attempting to modify an exhange by declaring - it again without an alternate does nothing - """ - session = self.session - session.exchange_declare(exchange="alt1", type="direct") - session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1") - # attempt to re-declare without an alternate - silently ignore - session.exchange_declare(exchange="onealternate", type="fanout" ) - session.exchange_delete(exchange="onealternate") - session.exchange_delete(exchange="alt1") - - - def assertEmpty(self, queue): - try: - msg = queue.get(timeout=1) - self.fail("Queue not empty: " + msg) - except Empty: None diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py deleted file mode 100644 index 81d723e322..0000000000 --- a/python/tests_0-10/broker.py +++ /dev/null @@ -1,93 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Closed -from qpid.queue import Empty -from qpid.testlib import TestBase010 -from qpid.datatypes import Message, RangedSet - -class BrokerTests(TestBase010): - """Tests for basic Broker functionality""" - - def test_ack_and_no_ack(self): - """ - First, this test tries to receive a message with a no-ack - consumer. Second, this test tries to explicitly receive and - acknowledge a message with an acknowledging consumer. - """ - session = self.session - session.queue_declare(queue = "myqueue", exclusive=True, auto_delete=True) - - # No ack consumer - ctag = "tag1" - session.message_subscribe(queue = "myqueue", destination = ctag) - session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) - body = "test no-ack" - session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"), body)) - msg = session.incoming(ctag).get(timeout = 5) - self.assert_(msg.body == body) - - # Acknowledging consumer - session.queue_declare(queue = "otherqueue", exclusive=True, auto_delete=True) - ctag = "tag2" - session.message_subscribe(queue = "otherqueue", destination = ctag, accept_mode = 1) - session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) - body = "test ack" - session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"), body)) - msg = session.incoming(ctag).get(timeout = 5) - session.message_accept(RangedSet(msg.id)) - self.assert_(msg.body == body) - - def test_simple_delivery_immediate(self): - """ - Test simple message delivery where consume is issued before publish - """ - session = self.session - session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - session.exchange_bind(queue="test-queue", exchange="amq.fanout") - consumer_tag = "tag1" - session.message_subscribe(queue="test-queue", destination=consumer_tag) - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) - queue = session.incoming(consumer_tag) - - body = "Immediate Delivery" - session.message_transfer("amq.fanout", None, None, Message(body)) - msg = queue.get(timeout=5) - self.assert_(msg.body == body) - - def test_simple_delivery_queued(self): - """ - Test basic message delivery where publish is issued before consume - (i.e. requires queueing of the message) - """ - session = self.session - session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - session.exchange_bind(queue="test-queue", exchange="amq.fanout") - body = "Queued Delivery" - session.message_transfer("amq.fanout", None, None, Message(body)) - - consumer_tag = "tag1" - session.message_subscribe(queue="test-queue", destination=consumer_tag) - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) - queue = session.incoming(consumer_tag) - msg = queue.get(timeout=5) - self.assert_(msg.body == body) diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py deleted file mode 100644 index 2823385a3b..0000000000 --- a/python/tests_0-10/dtx.py +++ /dev/null @@ -1,775 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.datatypes import Message, RangedSet -from qpid.session import SessionException -from qpid.testlib import TestBase010 -from qpid.compat import set -from struct import pack, unpack -from time import sleep - -class DtxTests(TestBase010): - """ - Tests for the amqp dtx related classes. - - Tests of the form test_simple_xxx test the basic transactional - behaviour. The approach here is to 'swap' a message from one queue - to another by consuming and re-publishing in the same - transaction. That transaction is then completed in different ways - and the appropriate result verified. - - The other tests enforce more specific rules and behaviour on a - per-method or per-field basis. - """ - - XA_RBROLLBACK = 1 - XA_RBTIMEOUT = 2 - XA_OK = 0 - tx_counter = 0 - - def reset_channel(self): - self.session.close() - self.session = self.conn.session("dtx-session", 1) - - def test_simple_commit(self): - """ - Test basic one-phase commit behaviour. - """ - guard = self.keepQueuesAlive(["queue-a", "queue-b"]) - session = self.session - tx = self.xid("my-xid") - self.txswap(tx, "commit") - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #commit - self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status) - - #should close and reopen session to ensure no unacked messages are held - self.reset_channel() - - #check result - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(1, "queue-b") - self.assertMessageId("commit", "queue-b") - - def test_simple_prepare_commit(self): - """ - Test basic two-phase commit behaviour. - """ - guard = self.keepQueuesAlive(["queue-a", "queue-b"]) - session = self.session - tx = self.xid("my-xid") - self.txswap(tx, "prepare-commit") - - #prepare - self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #commit - self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status) - - self.reset_channel() - - #check result - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(1, "queue-b") - self.assertMessageId("prepare-commit", "queue-b") - - - def test_simple_rollback(self): - """ - Test basic rollback behaviour. - """ - guard = self.keepQueuesAlive(["queue-a", "queue-b"]) - session = self.session - tx = self.xid("my-xid") - self.txswap(tx, "rollback") - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #rollback - self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) - - self.reset_channel() - - #check result - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("rollback", "queue-a") - - def test_simple_prepare_rollback(self): - """ - Test basic rollback behaviour after the transaction has been prepared. - """ - guard = self.keepQueuesAlive(["queue-a", "queue-b"]) - session = self.session - tx = self.xid("my-xid") - self.txswap(tx, "prepare-rollback") - - #prepare - self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #rollback - self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) - - self.reset_channel() - - #check result - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("prepare-rollback", "queue-a") - - def test_select_required(self): - """ - check that an error is flagged if select is not issued before - start or end - """ - session = self.session - tx = self.xid("dummy") - try: - session.dtx_start(xid=tx) - - #if we get here we have failed, but need to do some cleanup: - session.dtx_end(xid=tx) - session.dtx_rollback(xid=tx) - self.fail("Session not selected for use with dtx, expected exception!") - except SessionException, e: - self.assertEquals(503, e.args[0].error_code) - - def test_start_already_known(self): - """ - Verify that an attempt to start an association with a - transaction that is already known is not allowed (unless the - join flag is set). - """ - #create two sessions on different connection & select them for use with dtx: - session1 = self.session - session1.dtx_select() - - other = self.connect() - session2 = other.session("other", 0) - session2.dtx_select() - - #create a xid - tx = self.xid("dummy") - #start work on one session under that xid: - session1.dtx_start(xid=tx) - #then start on the other without the join set - failed = False - try: - session2.dtx_start(xid=tx) - except SessionException, e: - failed = True - error = e - - #cleanup: - if not failed: - session2.dtx_end(xid=tx) - other.close() - session1.dtx_end(xid=tx) - session1.dtx_rollback(xid=tx) - - #verification: - if failed: self.assertEquals(530, error.args[0].error_code) - else: self.fail("Xid already known, expected exception!") - - def test_forget_xid_on_completion(self): - """ - Verify that a xid is 'forgotten' - and can therefore be used - again - once it is completed. - """ - #do some transactional work & complete the transaction - self.test_simple_commit() - # session has been reset, so reselect for use with dtx - self.session.dtx_select() - - #start association for the same xid as the previously completed txn - tx = self.xid("my-xid") - self.session.dtx_start(xid=tx) - self.session.dtx_end(xid=tx) - self.session.dtx_rollback(xid=tx) - - def test_start_join_and_resume(self): - """ - Ensure the correct error is signalled when both the join and - resume flags are set on starting an association between a - session and a transcation. - """ - session = self.session - session.dtx_select() - tx = self.xid("dummy") - try: - session.dtx_start(xid=tx, join=True, resume=True) - #failed, but need some cleanup: - session.dtx_end(xid=tx) - session.dtx_rollback(xid=tx) - self.fail("Join and resume both set, expected exception!") - except SessionException, e: - self.assertEquals(503, e.args[0].error_code) - - def test_start_join(self): - """ - Verify 'join' behaviour, where a session is associated with a - transaction that is already associated with another session. - """ - guard = self.keepQueuesAlive(["one", "two"]) - #create two sessions & select them for use with dtx: - session1 = self.session - session1.dtx_select() - - session2 = self.conn.session("second", 2) - session2.dtx_select() - - #setup - session1.queue_declare(queue="one", auto_delete=True) - session1.queue_declare(queue="two", auto_delete=True) - session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage")) - session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage")) - - #create a xid - tx = self.xid("dummy") - #start work on one session under that xid: - session1.dtx_start(xid=tx) - #then start on the other with the join flag set - session2.dtx_start(xid=tx, join=True) - - #do work through each session - self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two' - self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one' - - #mark end on both sessions - session1.dtx_end(xid=tx) - session2.dtx_end(xid=tx) - - #commit and check - session1.dtx_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - - def test_suspend_resume(self): - """ - Test suspension and resumption of an association - """ - session = self.session - session.dtx_select() - - #setup - session.queue_declare(queue="one", exclusive=True, auto_delete=True) - session.queue_declare(queue="two", exclusive=True, auto_delete=True) - session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) - session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) - - tx = self.xid("dummy") - - session.dtx_start(xid=tx) - self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' - session.dtx_end(xid=tx, suspend=True) - - session.dtx_start(xid=tx, resume=True) - self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' - session.dtx_end(xid=tx) - - #commit and check - session.dtx_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - def test_suspend_start_end_resume(self): - """ - Test suspension and resumption of an association with work - done on another transaction when the first transaction is - suspended - """ - session = self.session - session.dtx_select() - - #setup - session.queue_declare(queue="one", exclusive=True, auto_delete=True) - session.queue_declare(queue="two", exclusive=True, auto_delete=True) - session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) - session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) - - tx = self.xid("dummy") - - session.dtx_start(xid=tx) - self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' - session.dtx_end(xid=tx, suspend=True) - - session.dtx_start(xid=tx, resume=True) - self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' - session.dtx_end(xid=tx) - - #commit and check - session.dtx_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - def test_end_suspend_and_fail(self): - """ - Verify that the correct error is signalled if the suspend and - fail flag are both set when disassociating a transaction from - the session - """ - session = self.session - session.dtx_select() - tx = self.xid("suspend_and_fail") - session.dtx_start(xid=tx) - try: - session.dtx_end(xid=tx, suspend=True, fail=True) - self.fail("Suspend and fail both set, expected exception!") - except SessionException, e: - self.assertEquals(503, e.args[0].error_code) - - #cleanup - other = self.connect() - session = other.session("cleanup", 1) - session.dtx_rollback(xid=tx) - session.close() - other.close() - - - def test_end_unknown_xid(self): - """ - Verifies that the correct exception is thrown when an attempt - is made to end the association for a xid not previously - associated with the session - """ - session = self.session - session.dtx_select() - tx = self.xid("unknown-xid") - try: - session.dtx_end(xid=tx) - self.fail("Attempted to end association with unknown xid, expected exception!") - except SessionException, e: - self.assertEquals(409, e.args[0].error_code) - - def test_end(self): - """ - Verify that the association is terminated by end and subsequent - operations are non-transactional - """ - guard = self.keepQueuesAlive(["tx-queue"]) - session = self.conn.session("alternate", 1) - session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True) - - #publish a message under a transaction - session.dtx_select() - tx = self.xid("dummy") - session.dtx_start(xid=tx) - session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage")) - session.dtx_end(xid=tx) - - #now that association with txn is ended, publish another message - session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage")) - - #check the second message is available, but not the first - self.assertMessageCount(1, "tx-queue") - self.subscribe(session, queue="tx-queue", destination="results") - msg = session.incoming("results").get(timeout=1) - self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id')) - session.message_cancel(destination="results") - #ack the message then close the session - session.message_accept(RangedSet(msg.id)) - session.close() - - session = self.session - #commit the transaction and check that the first message (and - #only the first message) is then delivered - session.dtx_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "tx-queue") - self.assertMessageId("one", "tx-queue") - - def test_invalid_commit_one_phase_true(self): - """ - Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. - """ - other = self.connect() - tester = other.session("tester", 1) - tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - tester.dtx_select() - tx = self.xid("dummy") - tester.dtx_start(xid=tx) - tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) - tester.dtx_end(xid=tx) - tester.dtx_prepare(xid=tx) - failed = False - try: - tester.dtx_commit(xid=tx, one_phase=True) - except SessionException, e: - failed = True - error = e - - if failed: - self.session.dtx_rollback(xid=tx) - self.assertEquals(409, error.args[0].error_code) - else: - tester.close() - other.close() - self.fail("Invalid use of one_phase=True, expected exception!") - - def test_invalid_commit_one_phase_false(self): - """ - Test that a commit with one_phase = False is rejected if the - transaction in question has not yet been prepared. - """ - other = self.connect() - tester = other.session("tester", 1) - tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - tester.dtx_select() - tx = self.xid("dummy") - tester.dtx_start(xid=tx) - tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) - tester.dtx_end(xid=tx) - failed = False - try: - tester.dtx_commit(xid=tx, one_phase=False) - except SessionException, e: - failed = True - error = e - - if failed: - self.session.dtx_rollback(xid=tx) - self.assertEquals(409, error.args[0].error_code) - else: - tester.close() - other.close() - self.fail("Invalid use of one_phase=False, expected exception!") - - def test_invalid_commit_not_ended(self): - """ - Test that a commit fails if the xid is still associated with a session. - """ - other = self.connect() - tester = other.session("tester", 1) - self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - self.session.dtx_select() - tx = self.xid("dummy") - self.session.dtx_start(xid=tx) - self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) - - failed = False - try: - tester.dtx_commit(xid=tx, one_phase=False) - except SessionException, e: - failed = True - error = e - - if failed: - self.session.dtx_end(xid=tx) - self.session.dtx_rollback(xid=tx) - self.assertEquals(409, error.args[0].error_code) - else: - tester.close() - other.close() - self.fail("Commit should fail as xid is still associated!") - - def test_invalid_rollback_not_ended(self): - """ - Test that a rollback fails if the xid is still associated with a session. - """ - other = self.connect() - tester = other.session("tester", 1) - self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - self.session.dtx_select() - tx = self.xid("dummy") - self.session.dtx_start(xid=tx) - self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) - - failed = False - try: - tester.dtx_rollback(xid=tx) - except SessionException, e: - failed = True - error = e - - if failed: - self.session.dtx_end(xid=tx) - self.session.dtx_rollback(xid=tx) - self.assertEquals(409, error.args[0].error_code) - else: - tester.close() - other.close() - self.fail("Rollback should fail as xid is still associated!") - - - def test_invalid_prepare_not_ended(self): - """ - Test that a prepare fails if the xid is still associated with a session. - """ - other = self.connect() - tester = other.session("tester", 1) - self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - self.session.dtx_select() - tx = self.xid("dummy") - self.session.dtx_start(xid=tx) - self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) - - failed = False - try: - tester.dtx_prepare(xid=tx) - except SessionException, e: - failed = True - error = e - - if failed: - self.session.dtx_end(xid=tx) - self.session.dtx_rollback(xid=tx) - self.assertEquals(409, error.args[0].error_code) - else: - tester.close() - other.close() - self.fail("Rollback should fail as xid is still associated!") - - def test_implicit_end(self): - """ - Test that an association is implicitly ended when the session - is closed (whether by exception or explicit client request) - and the transaction in question is marked as rollback only. - """ - session1 = self.session - session2 = self.conn.session("other", 2) - - #setup: - session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever")) - tx = self.xid("dummy") - - session2.dtx_select() - session2.dtx_start(xid=tx) - session2.message_subscribe(queue="dummy", destination="dummy") - session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1) - session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFFL) - msg = session2.incoming("dummy").get(timeout=1) - session2.message_accept(RangedSet(msg.id)) - session2.message_cancel(destination="dummy") - session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever")) - session2.close() - - self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status) - session1.dtx_rollback(xid=tx) - - def test_get_timeout(self): - """ - Check that get-timeout returns the correct value, (and that a - transaction with a timeout can complete normally) - """ - session = self.session - tx = self.xid("dummy") - - session.dtx_select() - session.dtx_start(xid=tx) - self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout) - session.dtx_set_timeout(xid=tx, timeout=60) - self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) - self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) - self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) - - def test_set_timeout(self): - """ - Test the timeout of a transaction results in the expected - behaviour - """ - - guard = self.keepQueuesAlive(["queue-a", "queue-b"]) - #open new session to allow self.session to be used in checking the queue - session = self.conn.session("worker", 1) - #setup: - tx = self.xid("dummy") - session.queue_declare(queue="queue-a", auto_delete=True) - session.queue_declare(queue="queue-b", auto_delete=True) - session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage")) - - session.dtx_select() - session.dtx_start(xid=tx) - self.swap(session, "queue-a", "queue-b") - session.dtx_set_timeout(xid=tx, timeout=2) - sleep(3) - #check that the work has been rolled back already - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("timeout", "queue-a") - #check the correct codes are returned when we try to complete the txn - self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) - self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) - - - - def test_recover(self): - """ - Test basic recover behaviour - """ - session = self.session - - session.dtx_select() - session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - - prepared = [] - for i in range(1, 10): - tx = self.xid("tx%s" % (i)) - session.dtx_start(xid=tx) - session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i))) - session.dtx_end(xid=tx) - if i in [2, 5, 6, 8]: - session.dtx_prepare(xid=tx) - prepared.append(tx) - else: - session.dtx_rollback(xid=tx) - - xids = session.dtx_recover().in_doubt - - #rollback the prepared transactions returned by recover - for x in xids: - session.dtx_rollback(xid=x) - - #validate against the expected list of prepared transactions - actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these - expected = set([x.global_id for x in prepared]) - intersection = actual.intersection(expected) - - if intersection != expected: - missing = expected.difference(actual) - extra = actual.difference(expected) - self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) - - def test_bad_resume(self): - """ - Test that a resume on a session not selected for use with dtx fails - """ - session = self.session - try: - session.dtx_start(resume=True) - except SessionException, e: - self.assertEquals(503, e.args[0].error_code) - - def test_prepare_unknown(self): - session = self.session - try: - session.dtx_prepare(xid=self.xid("unknown")) - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def test_commit_unknown(self): - session = self.session - try: - session.dtx_commit(xid=self.xid("unknown")) - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def test_rollback_unknown(self): - session = self.session - try: - session.dtx_rollback(xid=self.xid("unknown")) - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def test_get_timeout_unknown(self): - session = self.session - try: - session.dtx_get_timeout(xid=self.xid("unknown")) - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def xid(self, txid): - DtxTests.tx_counter += 1 - branchqual = "v%s" % DtxTests.tx_counter - return self.session.xid(format=0, global_id=txid, branch_id=branchqual) - - def txswap(self, tx, id): - session = self.session - #declare two queues: - session.queue_declare(queue="queue-a", auto_delete=True) - session.queue_declare(queue="queue-b", auto_delete=True) - - #put message with specified id on one queue: - dp=session.delivery_properties(routing_key="queue-a") - mp=session.message_properties(correlation_id=id) - session.message_transfer(message=Message(dp, mp, "DtxMessage")) - - #start the transaction: - session.dtx_select() - self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) - - #'swap' the message from one queue to the other, under that transaction: - self.swap(self.session, "queue-a", "queue-b") - - #mark the end of the transactional work: - self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status) - - def swap(self, session, src, dest): - #consume from src: - session.message_subscribe(destination="temp-swap", queue=src) - session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1) - session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - msg = session.incoming("temp-swap").get(timeout=1) - session.message_cancel(destination="temp-swap") - session.message_accept(RangedSet(msg.id)) - #todo: also complete at this point? - - #re-publish to dest: - dp=session.delivery_properties(routing_key=dest) - mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id')) - session.message_transfer(message=Message(dp, mp, msg.body)) - - def assertMessageCount(self, expected, queue): - self.assertEqual(expected, self.session.queue_query(queue=queue).message_count) - - def assertMessageId(self, expected, queue): - self.session.message_subscribe(queue=queue, destination="results") - self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1) - self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) - self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id')) - self.session.message_cancel(destination="results") - - def getMessageProperty(self, msg, prop): - for h in msg.headers: - if hasattr(h, prop): return getattr(h, prop) - return None - - def keepQueuesAlive(self, names): - session = self.conn.session("nasty", 99) - for n in names: - session.queue_declare(queue=n, auto_delete=True) - session.message_subscribe(destination=n, queue=n) - return session - - def createMessage(self, session, key, id, body): - dp=session.delivery_properties(routing_key=key) - mp=session.message_properties(correlation_id=id) - session.message_transfer(message=Message(dp, mp, body)) diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py deleted file mode 100644 index e36907d501..0000000000 --- a/python/tests_0-10/example.py +++ /dev/null @@ -1,95 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.datatypes import Message, RangedSet -from qpid.testlib import TestBase010 - -class ExampleTest (TestBase010): - """ - An example Qpid test, illustrating the unittest framework and the - python Qpid client. The test class must inherit TestBase. The - test code uses the Qpid client to interact with a qpid broker and - verify it behaves as expected. - """ - - def test_example(self): - """ - An example test. Note that test functions must start with 'test_' - to be recognized by the test framework. - """ - - # By inheriting TestBase, self.client is automatically connected - # and self.session is automatically opened as session(1) - # Other session methods mimic the protocol. - session = self.session - - # Now we can send regular commands. If you want to see what the method - # arguments mean or what other commands are available, you can use the - # python builtin help() method. For example: - #help(chan) - #help(chan.exchange_declare) - - # If you want browse the available protocol methods without being - # connected to a live server you can use the amqp-doc utility: - # - # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] - # - # Options: - # -e, --regexp use regex instead of glob when matching - - # Now that we know what commands are available we can use them to - # interact with the server. - - # Here we use ordinal arguments. - session.exchange_declare("test", "direct") - - # Here we use keyword arguments. - session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - session.exchange_bind(queue="test-queue", exchange="test", binding_key="key") - - # Call Session.subscribe to register as a consumer. - # All the protocol methods return a message object. The message object - # has fields corresponding to the reply method fields, plus a content - # field that is filled if the reply includes content. In this case the - # interesting field is the consumer_tag. - session.message_subscribe(queue="test-queue", destination="consumer_tag") - session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - - # We can use the session.incoming(...) method to access the messages - # delivered for our consumer_tag. - queue = session.incoming("consumer_tag") - - # Now lets publish a message and see if our consumer gets it. To do - # this we need to import the Message class. - delivery_properties = session.delivery_properties(routing_key="key") - sent = Message(delivery_properties, "Hello World!") - session.message_transfer(destination="test", message=sent) - - # Now we'll wait for the message to arrive. We can use the timeout - # argument in case the server hangs. By default queue.get() will wait - # until a message arrives or the connection to the server dies. - msg = queue.get(timeout=10) - - # And check that we got the right response with assertEqual - self.assertEqual(sent.body, msg.body) - - # Now acknowledge the message. - session.message_accept(RangedSet(msg.id)) - diff --git a/python/tests_0-10/exchange.py b/python/tests_0-10/exchange.py deleted file mode 100644 index 0ac78a4799..0000000000 --- a/python/tests_0-10/exchange.py +++ /dev/null @@ -1,461 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Tests for exchange behaviour. - -Test classes ending in 'RuleTests' are derived from rules in amqp.xml. -""" - -import Queue, logging, traceback -from qpid.testlib import TestBase010 -from qpid.datatypes import Message -from qpid.client import Closed -from qpid.session import SessionException - - -class TestHelper(TestBase010): - def setUp(self): - TestBase010.setUp(self) - self.queues = [] - self.exchanges = [] - self.subscriptions = [] - - def tearDown(self): - try: - for s in self.subscriptions: - self.session.message_cancel(destination=s) - for ssn, q in self.queues: - ssn.queue_delete(queue=q) - for ssn, ex in self.exchanges: - ssn.exchange_delete(exchange=ex) - except: - print "Error on tearDown:" - print traceback.print_exc() - TestBase010.tearDown(self) - - def createMessage(self, key="", body=""): - return Message(self.session.delivery_properties(routing_key=key), body) - - def getApplicationHeaders(self, msg): - for h in msg.headers: - if hasattr(h, 'application_headers'): return getattr(h, 'application_headers') - return None - - def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): - """ - Publish to exchange and assert queue.get() returns the same message. - """ - body = self.uniqueString() - dp=self.session.delivery_properties(routing_key=routing_key) - mp=self.session.message_properties(application_headers=properties) - self.session.message_transfer(destination=exchange, message=Message(dp, mp, body)) - msg = queue.get(timeout=1) - self.assertEqual(body, msg.body) - if (properties): - self.assertEqual(properties, self.getApplicationHeaders(msg)) - - def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): - """ - Publish a message and consume it, assert it comes back intact. - Return the Queue object used to consume. - """ - self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) - - def assertEmpty(self, queue): - """Assert that the queue is empty""" - try: - queue.get(timeout=1) - self.fail("Queue is not empty.") - except Queue.Empty: None # Ignore - - def queue_declare(self, session=None, *args, **keys): - session = session or self.session - reply = session.queue_declare(*args, **keys) - self.queues.append((session, keys["queue"])) - return reply - - def exchange_declare(self, session=None, ticket=0, exchange='', - type='', passive=False, durable=False, - auto_delete=False, - arguments={}): - session = session or self.session - reply = session.exchange_declare(exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments) - self.exchanges.append((session,exchange)) - return reply - - def uniqueString(self): - """Generate a unique string, unique for this TestBase instance""" - if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; - return "Test Message " + str(self.uniqueCounter) - - def consume(self, queueName): - """Consume from named queue returns the Queue object.""" - if not "uniqueTag" in dir(self): self.uniqueTag = 1 - else: self.uniqueTag += 1 - consumer_tag = "tag" + str(self.uniqueTag) - self.session.message_subscribe(queue=queueName, destination=consumer_tag) - self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFFL) - self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) - self.subscriptions.append(consumer_tag) - return self.session.incoming(consumer_tag) - - -class StandardExchangeVerifier: - """Verifies standard exchange behavior. - - Used as base class for classes that test standard exchanges.""" - - def verifyDirectExchange(self, ex): - """Verify that ex behaves like a direct exchange.""" - self.queue_declare(queue="q") - self.session.exchange_bind(queue="q", exchange=ex, binding_key="k") - self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") - try: - self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") - self.fail("Expected Empty exception") - except Queue.Empty: None # Expected - - def verifyFanOutExchange(self, ex): - """Verify that ex behaves like a fanout exchange.""" - self.queue_declare(queue="q") - self.session.exchange_bind(queue="q", exchange=ex) - self.queue_declare(queue="p") - self.session.exchange_bind(queue="p", exchange=ex) - for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) - - def verifyTopicExchange(self, ex): - """Verify that ex behaves like a topic exchange""" - self.queue_declare(queue="a") - self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*") - q = self.consume("a") - self.assertPublishGet(q, ex, "a.b.x") - self.assertPublishGet(q, ex, "a.x.b.x") - self.assertPublishGet(q, ex, "a.x.x.b.x") - # Shouldn't match - self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) - self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y")) - self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x")) - self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) - self.assert_(q.empty()) - - def verifyHeadersExchange(self, ex): - """Verify that ex is a headers exchange""" - self.queue_declare(queue="q") - self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) - q = self.consume("q") - headers = {"name":"fred", "age":3} - self.assertPublishGet(q, exchange=ex, properties=headers) - self.session.message_transfer(destination=ex) # No headers, won't deliver - self.assertEmpty(q); - - -class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier): - """ - The server SHOULD implement these standard exchange types: topic, headers. - - Client attempts to declare an exchange with each of these standard types. - """ - - def testDirect(self): - """Declare and test a direct exchange""" - self.exchange_declare(0, exchange="d", type="direct") - self.verifyDirectExchange("d") - - def testFanout(self): - """Declare and test a fanout exchange""" - self.exchange_declare(0, exchange="f", type="fanout") - self.verifyFanOutExchange("f") - - def testTopic(self): - """Declare and test a topic exchange""" - self.exchange_declare(0, exchange="t", type="topic") - self.verifyTopicExchange("t") - - def testHeaders(self): - """Declare and test a headers exchange""" - self.exchange_declare(0, exchange="h", type="headers") - self.verifyHeadersExchange("h") - - -class RequiredInstancesRuleTests(TestHelper, StandardExchangeVerifier): - """ - The server MUST, in each virtual host, pre-declare an exchange instance - for each standard exchange type that it implements, where the name of the - exchange instance is amq. followed by the exchange type name. - - Client creates a temporary queue and attempts to bind to each required - exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if - those types are defined). - """ - def testAmqDirect(self): self.verifyDirectExchange("amq.direct") - - def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") - - def testAmqTopic(self): self.verifyTopicExchange("amq.topic") - - def testAmqMatch(self): self.verifyHeadersExchange("amq.match") - -class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier): - """ - The server MUST predeclare a direct exchange to act as the default exchange - for content Publish methods and for default queue bindings. - - Client checks that the default exchange is active by specifying a queue - binding with no exchange name, and publishing a message with a suitable - routing key but without specifying the exchange name, then ensuring that - the message arrives in the queue correctly. - """ - def testDefaultExchange(self): - # Test automatic binding by queue name. - self.queue_declare(queue="d") - self.assertPublishConsume(queue="d", routing_key="d") - # Test explicit bind to default queue - self.verifyDirectExchange("") - - -# TODO aconway 2006-09-27: Fill in empty tests: - -class DefaultAccessRuleTests(TestHelper): - """ - The server MUST NOT allow clients to access the default exchange except - by specifying an empty exchange name in the Queue.Bind and content Publish - methods. - """ - -class ExtensionsRuleTests(TestHelper): - """ - The server MAY implement other exchange types as wanted. - """ - - -class DeclareMethodMinimumRuleTests(TestHelper): - """ - The server SHOULD support a minimum of 16 exchanges per virtual host and - ideally, impose no limit except as defined by available resources. - - The client creates as many exchanges as it can until the server reports - an error; the number of exchanges successfuly created must be at least - sixteen. - """ - - -class DeclareMethodTicketFieldValidityRuleTests(TestHelper): - """ - The client MUST provide a valid access ticket giving "active" access to - the realm in which the exchange exists or will be created, or "passive" - access if the if-exists flag is set. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeclareMethodExchangeFieldReservedRuleTests(TestHelper): - """ - Exchange names starting with "amq." are reserved for predeclared and - standardised exchanges. The client MUST NOT attempt to create an exchange - starting with "amq.". - - Similarly, exchanges starting with "qpid." are reserved for Qpid - implementation-specific system exchanges (such as the management exchange). - The client must not attempt to create an exchange starting with the string - "qpid.". - """ - def template(self, reservedString, exchangeType): - try: - self.session.exchange_declare(exchange=reservedString, type=exchangeType) - self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".") - except SessionException, e: - self.assertEquals(e.args[0].error_code, 530) - # connection closed, reopen it - self.tearDown() - self.setUp() - try: - self.session.exchange_declare(exchange=reservedString + "abc123", type=exchangeType) - self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".") - except SessionException, e: - self.assertEquals(e.args[0].error_code, 530) - # connection closed, reopen it - self.tearDown() - self.setUp() - # The following should be legal: - self.session.exchange_declare(exchange=reservedString[:-1], type=exchangeType) - self.session.exchange_delete(exchange=reservedString[:-1]) - self.session.exchange_declare(exchange=reservedString[1:], type=exchangeType) - self.session.exchange_delete(exchange=reservedString[1:]) - self.session.exchange_declare(exchange="." + reservedString, type=exchangeType) - self.session.exchange_delete(exchange="." + reservedString) - self.session.exchange_declare(exchange="abc." + reservedString, type=exchangeType) - self.session.exchange_delete(exchange="abc." + reservedString) - self.session.exchange_declare(exchange="abc." + reservedString + "def", type=exchangeType) - self.session.exchange_delete(exchange="abc." + reservedString + "def") - - def test_amq(self): - self.template("amq.", "direct") - self.template("amq.", "topic") - self.template("amq.", "fanout") - - def test_qpid(self): - self.template("qpid.", "direct") - self.template("qpid.", "topic") - self.template("qpid.", "fanout") - - -class DeclareMethodTypeFieldTypedRuleTests(TestHelper): - """ - Exchanges cannot be redeclared with different types. The client MUST not - attempt to redeclare an existing exchange with a different type than used - in the original Exchange.Declare method. - - - """ - - -class DeclareMethodTypeFieldSupportRuleTests(TestHelper): - """ - The client MUST NOT attempt to create an exchange with a type that the - server does not support. - - - """ - - -class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper): - """ - If set, and the exchange does not already exist, the server MUST raise a - channel exception with reply code 404 (not found). - """ - def test(self): - try: - self.session.exchange_declare(exchange="humpty_dumpty", passive=True) - self.fail("Expected 404 for passive declaration of unknown exchange.") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - -class DeclareMethodDurableFieldSupportRuleTests(TestHelper): - """ - The server MUST support both durable and transient exchanges. - - - """ - - -class DeclareMethodDurableFieldStickyRuleTests(TestHelper): - """ - The server MUST ignore the durable field if the exchange already exists. - - - """ - - -class DeclareMethodAutoDeleteFieldStickyRuleTests(TestHelper): - """ - The server MUST ignore the auto-delete field if the exchange already - exists. - - - """ - - -class DeleteMethodTicketFieldValidityRuleTests(TestHelper): - """ - The client MUST provide a valid access ticket giving "active" access - rights to the exchange's access realm. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeleteMethodExchangeFieldExistsRuleTests(TestHelper): - """ - The client MUST NOT attempt to delete an exchange that does not exist. - """ - - -class HeadersExchangeTests(TestHelper): - """ - Tests for headers exchange functionality. - """ - def setUp(self): - TestHelper.setUp(self) - self.queue_declare(queue="q") - self.q = self.consume("q") - - def myAssertPublishGet(self, headers): - self.assertPublishGet(self.q, exchange="amq.match", properties=headers) - - def myBasicPublish(self, headers): - mp=self.session.message_properties(application_headers=headers) - self.session.message_transfer(destination="amq.match", message=Message(mp, "foobar")) - - def testMatchAll(self): - self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) - - # None of these should match - self.myBasicPublish({}) - self.myBasicPublish({"name":"barney"}) - self.myBasicPublish({"name":10}) - self.myBasicPublish({"name":"fred", "age":2}) - self.assertEmpty(self.q) - - def testMatchAny(self): - self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred"}) - self.myAssertPublishGet({"name":"fred", "ignoreme":10}) - self.myAssertPublishGet({"ignoreme":10, "age":3}) - - # Wont match - self.myBasicPublish({}) - self.myBasicPublish({"irrelevant":0}) - self.assertEmpty(self.q) - - -class MiscellaneousErrorsTests(TestHelper): - """ - Test some miscellaneous error conditions - """ - def testTypeNotKnown(self): - try: - self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") - self.fail("Expected 503 for declaration of unknown exchange type.") - except SessionException, e: - self.assertEquals(503, e.args[0].error_code) - - def testDifferentDeclaredType(self): - self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") - try: - session = self.conn.session("alternate", 2) - session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") - self.fail("Expected 530 for redeclaration of exchange with different type.") - except SessionException, e: - self.assertEquals(530, e.args[0].error_code) - -class ExchangeTests(TestHelper): - def testHeadersBindNoMatchArg(self): - self.session.queue_declare(queue="q", exclusive=True, auto_delete=True) - try: - self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} ) - self.fail("Expected failure for missing x-match arg.") - except SessionException, e: - self.assertEquals(541, e.args[0].error_code) diff --git a/python/tests_0-10/management.py b/python/tests_0-10/management.py deleted file mode 100644 index 677645fa2c..0000000000 --- a/python/tests_0-10/management.py +++ /dev/null @@ -1,467 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.datatypes import Message, RangedSet -from qpid.testlib import TestBase010 -from qpid.management import managementChannel, managementClient -from threading import Condition -from time import sleep -import qmf.console - -class ManagementTest (TestBase010): - """ - Tests for the management hooks - """ - - def test_broker_connectivity_oldAPI (self): - """ - Call the "echo" method on the broker to verify it is alive and talking. - """ - session = self.session - - mc = managementClient () - mch = mc.addChannel (session) - - mc.syncWaitForStable (mch) - brokers = mc.syncGetObjects (mch, "broker") - self.assertEqual (len (brokers), 1) - broker = brokers[0] - args = {} - body = "Echo Message Body" - args["body"] = body - - for seq in range (1, 5): - args["sequence"] = seq - res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args) - self.assertEqual (res.status, 0) - self.assertEqual (res.statusText, "OK") - self.assertEqual (res.sequence, seq) - self.assertEqual (res.body, body) - mc.removeChannel (mch) - - def test_methods_sync (self): - """ - Call the "echo" method on the broker to verify it is alive and talking. - """ - session = self.session - self.startQmf() - - brokers = self.qmf.getObjects(_class="broker") - self.assertEqual(len(brokers), 1) - broker = brokers[0] - - body = "Echo Message Body" - for seq in range(1, 20): - res = broker.echo(seq, body) - self.assertEqual(res.status, 0) - self.assertEqual(res.text, "OK") - self.assertEqual(res.sequence, seq) - self.assertEqual(res.body, body) - - def test_get_objects(self): - self.startQmf() - - # get the package list, verify that the qpid broker package is there - packages = self.qmf.getPackages() - assert 'org.apache.qpid.broker' in packages - - # get the schema class keys for the broker, verify the broker table and link-down event - keys = self.qmf.getClasses('org.apache.qpid.broker') - broker = None - linkDown = None - for key in keys: - if key.getClassName() == "broker": broker = key - if key.getClassName() == "brokerLinkDown" : linkDown = key - assert broker - assert linkDown - - brokerObjs = self.qmf.getObjects(_class="broker") - assert len(brokerObjs) == 1 - brokerObjs = self.qmf.getObjects(_key=broker) - assert len(brokerObjs) == 1 - - def test_self_session_id (self): - self.startQmf() - sessionId = self.qmf_broker.getSessionId() - brokerSessions = self.qmf.getObjects(_class="session") - - found = False - for bs in brokerSessions: - if bs.name == sessionId: - found = True - self.assertEqual (found, True) - - def test_standard_exchanges (self): - self.startQmf() - - exchanges = self.qmf.getObjects(_class="exchange") - exchange = self.findExchange (exchanges, "") - self.assertEqual (exchange.type, "direct") - exchange = self.findExchange (exchanges, "amq.direct") - self.assertEqual (exchange.type, "direct") - exchange = self.findExchange (exchanges, "amq.topic") - self.assertEqual (exchange.type, "topic") - exchange = self.findExchange (exchanges, "amq.fanout") - self.assertEqual (exchange.type, "fanout") - exchange = self.findExchange (exchanges, "amq.match") - self.assertEqual (exchange.type, "headers") - exchange = self.findExchange (exchanges, "qpid.management") - self.assertEqual (exchange.type, "topic") - - def findExchange (self, exchanges, name): - for exchange in exchanges: - if exchange.name == name: - return exchange - return None - - def test_move_queued_messages(self): - """ - Test ability to move messages from the head of one queue to another. - Need to test moveing all and N messages. - """ - self.startQmf() - session = self.session - "Set up source queue" - session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True) - session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key") - - twenty = range(1,21) - props = session.delivery_properties(routing_key="routing_key") - for count in twenty: - body = "Move Message %d" % count - src_msg = Message(props, body) - session.message_transfer(destination="amq.direct", message=src_msg) - - "Set up destination queue" - session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True) - session.exchange_bind(queue="dest-queue", exchange="amq.direct") - - queues = self.qmf.getObjects(_class="queue") - - "Move 10 messages from src-queue to dest-queue" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) - self.assertEqual (result.status, 0) - - sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] - dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] - - self.assertEqual (sq.msgDepth,10) - self.assertEqual (dq.msgDepth,10) - - "Move all remaining messages to destination" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) - self.assertEqual (result.status,0) - - sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] - dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] - - self.assertEqual (sq.msgDepth,0) - self.assertEqual (dq.msgDepth,20) - - "Use a bad source queue name" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) - self.assertEqual (result.status,4) - - "Use a bad destination queue name" - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) - self.assertEqual (result.status,4) - - " Use a large qty (40) to move from dest-queue back to " - " src-queue- should move all " - result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) - self.assertEqual (result.status,0) - - sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] - dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] - - self.assertEqual (sq.msgDepth,20) - self.assertEqual (dq.msgDepth,0) - - "Consume the messages of the queue and check they are all there in order" - session.message_subscribe(queue="src-queue", destination="tag") - session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - queue = session.incoming("tag") - for count in twenty: - consumed_msg = queue.get(timeout=1) - body = "Move Message %d" % count - self.assertEqual(body, consumed_msg.body) - - def test_purge_queue(self): - """ - Test ability to purge messages from the head of a queue. - Need to test moveing all, 1 (top message) and N messages. - """ - self.startQmf() - session = self.session - "Set up purge queue" - session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True) - session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key") - - twenty = range(1,21) - props = session.delivery_properties(routing_key="routing_key") - for count in twenty: - body = "Purge Message %d" % count - msg = Message(props, body) - session.message_transfer(destination="amq.direct", message=msg) - - pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] - - "Purge top message from purge-queue" - result = pq.purge(1) - self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] - self.assertEqual (pq.msgDepth,19) - - "Purge top 9 messages from purge-queue" - result = pq.purge(9) - self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] - self.assertEqual (pq.msgDepth,10) - - "Purge all messages from purge-queue" - result = pq.purge(0) - self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] - self.assertEqual (pq.msgDepth,0) - - def test_reroute_queue(self): - """ - Test ability to reroute messages from the head of a queue. - Need to test moving all, 1 (top message) and N messages. - """ - self.startQmf() - session = self.session - "Set up test queue" - session.exchange_declare(exchange="alt.direct1", type="direct") - session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True) - session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key") - session.exchange_declare(exchange="alt.direct2", type="direct") - session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True) - session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key") - session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1") - session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key") - - twenty = range(1,21) - props = session.delivery_properties(routing_key="routing_key") - for count in twenty: - body = "Reroute Message %d" % count - msg = Message(props, body) - session.message_transfer(destination="amq.direct", message=msg) - - pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0] - - "Reroute top message from reroute-queue to alternate exchange" - result = pq.reroute(1, True, "") - self.assertEqual(result.status, 0) - pq.update() - aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0] - self.assertEqual(pq.msgDepth,19) - self.assertEqual(aq.msgDepth,1) - - "Reroute top 9 messages from reroute-queue to alt.direct2" - result = pq.reroute(9, False, "alt.direct2") - self.assertEqual(result.status, 0) - pq.update() - aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] - self.assertEqual(pq.msgDepth,10) - self.assertEqual(aq.msgDepth,9) - - "Reroute using a non-existent exchange" - result = pq.reroute(0, False, "amq.nosuchexchange") - self.assertEqual(result.status, 4) - - "Reroute all messages from reroute-queue" - result = pq.reroute(0, False, "alt.direct2") - self.assertEqual(result.status, 0) - pq.update() - aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0] - self.assertEqual(pq.msgDepth,0) - self.assertEqual(aq.msgDepth,19) - - "Make more messages" - twenty = range(1,21) - props = session.delivery_properties(routing_key="routing_key") - for count in twenty: - body = "Reroute Message %d" % count - msg = Message(props, body) - session.message_transfer(destination="amq.direct", message=msg) - - "Reroute onto the same queue" - result = pq.reroute(0, False, "amq.direct") - self.assertEqual(result.status, 0) - pq.update() - self.assertEqual(pq.msgDepth,20) - - - def test_methods_async (self): - """ - """ - class Handler (qmf.console.Console): - def __init__(self): - self.cv = Condition() - self.xmtList = {} - self.rcvList = {} - - def methodResponse(self, broker, seq, response): - self.cv.acquire() - try: - self.rcvList[seq] = response - finally: - self.cv.release() - - def request(self, broker, count): - self.count = count - for idx in range(count): - self.cv.acquire() - try: - seq = broker.echo(idx, "Echo Message", _async = True) - self.xmtList[seq] = idx - finally: - self.cv.release() - - def check(self): - if self.count != len(self.xmtList): - return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList)) - lost = 0 - mismatched = 0 - for seq in self.xmtList: - value = self.xmtList[seq] - if seq in self.rcvList: - result = self.rcvList.pop(seq) - if result.sequence != value: - mismatched += 1 - else: - lost += 1 - spurious = len(self.rcvList) - if lost == 0 and mismatched == 0 and spurious == 0: - return "pass" - else: - return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious) - - handler = Handler() - self.startQmf(handler) - brokers = self.qmf.getObjects(_class="broker") - self.assertEqual(len(brokers), 1) - broker = brokers[0] - handler.request(broker, 20) - sleep(1) - self.assertEqual(handler.check(), "pass") - - def test_connection_close(self): - """ - Test management method for closing connection - """ - self.startQmf() - conn = self.connect() - session = conn.session("my-named-session") - - #using qmf find named session and close the corresponding connection: - qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0] - qmf_ssn_object._connectionRef_.close() - - #check that connection is closed - try: - conn.session("another-session") - self.fail("Expected failure from closed connection") - except: None - - #make sure that the named session has been closed and the name can be re-used - conn = self.connect() - session = conn.session("my-named-session") - session.queue_declare(queue="whatever", exclusive=True, auto_delete=True) - - def test_binding_count_on_queue(self): - self.startQmf() - conn = self.connect() - session = self.session - - QUEUE = "binding_test_queue" - EX_DIR = "binding_test_exchange_direct" - EX_FAN = "binding_test_exchange_fanout" - EX_TOPIC = "binding_test_exchange_topic" - EX_HDR = "binding_test_exchange_headers" - - # - # Create a test queue - # - session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True) - queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0] - if not queue: - self.fail("Queue not found") - self.assertEqual(queue.bindingCount, 1, "wrong initial binding count") - - # - # Create an exchange of each supported type - # - session.exchange_declare(exchange=EX_DIR, type="direct") - session.exchange_declare(exchange=EX_FAN, type="fanout") - session.exchange_declare(exchange=EX_TOPIC, type="topic") - session.exchange_declare(exchange=EX_HDR, type="headers") - - # - # Bind each exchange to the test queue - # - match = {} - match['x-match'] = "all" - match['key'] = "value" - session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1") - session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2") - session.exchange_bind(exchange=EX_FAN, queue=QUEUE) - session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#") - session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#") - session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match) - match['key2'] = "value2" - session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match) - - # - # Verify that the queue's binding count accounts for the new bindings - # - queue.update() - self.assertEqual(queue.bindingCount, 8, - "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount) - - # - # Remove some of the bindings - # - session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2") - session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#") - session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2") - - # - # Verify that the queue's binding count accounts for the deleted bindings - # - queue.update() - self.assertEqual(queue.bindingCount, 5, - "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount) - # - # Delete the exchanges - # - session.exchange_delete(exchange=EX_DIR) - session.exchange_delete(exchange=EX_FAN) - session.exchange_delete(exchange=EX_TOPIC) - session.exchange_delete(exchange=EX_HDR) - - # - # Verify that the queue's binding count accounts for the lost bindings - # - queue.update() - self.assertEqual(queue.bindingCount, 1, - "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount) - diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py deleted file mode 100644 index e80333a1e6..0000000000 --- a/python/tests_0-10/message.py +++ /dev/null @@ -1,918 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.testlib import TestBase010 -from qpid.datatypes import Message, RangedSet -from qpid.session import SessionException - -from qpid.content import Content -from time import sleep - -class MessageTests(TestBase010): - """Tests for 'methods' on the amqp message 'class'""" - - def test_no_local(self): - """ - NOTE: this is a test of a QPID specific feature - - Test that the qpid specific no_local arg is honoured. - """ - session = self.session - #setup, declare two queues one of which excludes delivery of locally sent messages - session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) - session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) - #establish two consumers - self.subscribe(destination="local_included", queue="test-queue-1a") - self.subscribe(destination="local_excluded", queue="test-queue-1b") - - #send a message - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) - - #send a message from another session on the same connection to each queue - session2 = self.conn.session("my-local-session") - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) - - #send a message from a session on another connection to each queue - for q in ["test-queue-1a", "test-queue-1b"]: - session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") - other = self.connect() - session3 = other.session("my-other-session") - session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) - other.close() - - #check the queues of the two consumers - excluded = session.incoming("local_excluded") - included = session.incoming("local_included") - for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: - msg = included.get(timeout=1) - self.assertEqual(b, msg.body) - msg = excluded.get(timeout=1) - self.assertEqual("i-am-not-local", msg.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - def test_no_local_awkward(self): - - """ - NOTE: this is a test of a QPID specific feature - - Check that messages which will be excluded through no-local - processing will not block subsequent deliveries - """ - - session = self.session - #setup: - session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'}) - #establish consumer which excludes delivery of locally sent messages - self.subscribe(destination="local_excluded", queue="test-queue") - - #send a 'local' message - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local")) - - #send a non local message - other = self.connect() - session2 = other.session("my-session", 1) - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign")) - session2.close() - other.close() - - #check that the second message only is delivered - excluded = session.incoming("local_excluded") - msg = excluded.get(timeout=1) - self.assertEqual("foreign", msg.body) - try: - excluded.get(timeout=1) - self.fail("Received extra message") - except Empty: None - #check queue is empty - self.assertEqual(0, session.queue_query(queue="test-queue").message_count) - - def test_no_local_exclusive_subscribe(self): - """ - NOTE: this is a test of a QPID specific feature - - Test that the no_local processing works on queues not declared - as exclusive, but with an exclusive subscription - """ - session = self.session - - #setup, declare two queues one of which excludes delivery of - #locally sent messages but is not declared as exclusive - session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) - session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'}) - #establish two consumers - self.subscribe(destination="local_included", queue="test-queue-1a") - self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True) - - #send a message from the same session to each queue - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me")) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me")) - - #send a message from another session on the same connection to each queue - session2 = self.conn.session("my-session") - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well")) - session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either")) - - #send a message from a session on another connection to each queue - for q in ["test-queue-1a", "test-queue-1b"]: - session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key") - other = self.connect() - session3 = other.session("my-other-session") - session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local")) - other.close() - - #check the queues of the two consumers - excluded = session.incoming("local_excluded") - included = session.incoming("local_included") - for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]: - msg = included.get(timeout=1) - self.assertEqual(b, msg.body) - msg = excluded.get(timeout=1) - self.assertEqual("i-am-not-local", msg.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test an exclusive consumer prevents other consumer being created - """ - session = self.session - session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) - session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) - try: - session.message_subscribe(destination="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except SessionException, e: - self.assertEquals(405, e.args[0].error_code) - - def test_consume_exclusive2(self): - """ - Check that an exclusive consumer cannot be created if a consumer already exists: - """ - session = self.session - session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) - session.message_subscribe(destination="first", queue="test-queue-2") - try: - session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except SessionException, e: - self.assertEquals(405, e.args[0].error_code) - - def test_consume_queue_not_found(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - session = self.session - try: - #queue specified but doesn't exist: - session.message_subscribe(queue="invalid-queue", destination="a") - self.fail("Expected failure when consuming from non-existent queue") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def test_consume_queue_not_specified(self): - session = self.session - try: - #queue not specified and none previously declared for channel: - session.message_subscribe(destination="a") - self.fail("Expected failure when consuming from unspecified queue") - except SessionException, e: - self.assertEquals(531, e.args[0].error_code) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - session = self.session - #setup, declare a queue: - session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) - - #check that attempts to use duplicate tags are detected and prevented: - session.message_subscribe(destination="first", queue="test-queue-3") - try: - session.message_subscribe(destination="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except SessionException, e: - self.assertEquals(530, e.args[0].error_code) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - session = self.session - #setup, declare a queue: - session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) - - session.message_subscribe(destination="my-consumer", queue="test-queue-4") - myqueue = session.incoming("my-consumer") - session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - - #should flush here - - #cancel should stop messages being delivered - session.message_cancel(destination="my-consumer") - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two")) - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.body) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - session.message_cancel(destination="my-consumer") - session.message_cancel(destination="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - session = self.conn.session("alternate-session", timeout=10) - session.queue_declare(queue="test-ack-queue", auto_delete=True) - - session.message_subscribe(queue = "test-ack-queue", destination = "consumer") - session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - queue = session.incoming("consumer") - - delivery_properties = session.delivery_properties(routing_key="test-ack-queue") - for i in ["One", "Two", "Three", "Four", "Five"]: - session.message_transfer(message=Message(delivery_properties, i)) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) - - session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four - - #subscribe from second session here to ensure queue is not - #auto-deleted when alternate session closes (no need to ack on these): - self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) - - #now close the session, and see that the unacked messages are - #then redelivered to another subscriber: - session.close(timeout=10) - - session = self.session - session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - queue = session.incoming("checker") - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - def test_reject(self): - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") - session.queue_declare(queue = "r", exclusive=True, auto_delete=True) - session.exchange_bind(queue = "r", exchange = "amq.fanout") - - session.message_subscribe(queue = "q", destination = "consumer") - session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah")) - msg = session.incoming("consumer").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - session.message_reject(RangedSet(msg.id)) - - session.message_subscribe(queue = "r", destination = "checker") - session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - msg = session.incoming("checker").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - - def test_credit_flow_messages(self): - """ - Test basic credit based flow control with unit = message - """ - #declare an exclusive queue - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 0, destination = "c") - #send batch of messages to queue - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) - - #set message credit to finite amount (less than enough for all messages) - session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") - #set infinite byte credit - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") - #check that expected number were received - q = session.incoming("c") - for i in range(1, 6): - self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) - self.assertEmpty(q) - - #increase credit again and check more are received - for i in range(6, 11): - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c") - self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) - self.assertEmpty(q) - - def test_credit_flow_bytes(self): - """ - Test basic credit based flow control with unit = bytes - """ - #declare an exclusive queue - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 0, destination = "c") - #send batch of messages to queue - for i in range(10): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) - - #each message is currently interpreted as requiring msg_size bytes of credit - msg_size = 19 - - #set byte credit to finite amount (less than enough for all messages) - session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") - #set infinite message credit - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c") - #check that expected number were received - q = session.incoming("c") - for i in range(5): - self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") - self.assertEmpty(q) - - #increase credit again and check more are received - for i in range(5): - session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c") - self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") - self.assertEmpty(q) - - - def test_window_flow_messages(self): - """ - Test basic window based flow control with unit = message - """ - #declare an exclusive queue - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 1, destination = "c") - #send batch of messages to queue - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i)) - - #set message credit to finite amount (less than enough for all messages) - session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") - #set infinite byte credit - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") - #check that expected number were received - q = session.incoming("c") - for i in range(1, 6): - msg = q.get(timeout = 1) - session.receiver._completed.add(msg.id)#TODO: this may be done automatically - self.assertDataEquals(session, msg, "Message %d" % i) - self.assertEmpty(q) - - #acknowledge messages and check more are received - #TODO: there may be a nicer way of doing this - session.channel.session_completed(session.receiver._completed) - - for i in range(6, 11): - self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) - self.assertEmpty(q) - - - def test_window_flow_bytes(self): - """ - Test basic window based flow control with unit = bytes - """ - #declare an exclusive queue - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer (for now that defaults to infinite credit) - session.message_subscribe(queue = "q", destination = "c") - session.message_set_flow_mode(flow_mode = 1, destination = "c") - #send batch of messages to queue - for i in range(10): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh")) - - #each message is currently interpreted as requiring msg_size bytes of credit - msg_size = 19 - - #set byte credit to finite amount (less than enough for all messages) - session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c") - #set infinite message credit - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c") - #check that expected number were received - q = session.incoming("c") - msgs = [] - for i in range(5): - msg = q.get(timeout = 1) - msgs.append(msg) - self.assertDataEquals(session, msg, "abcdefgh") - self.assertEmpty(q) - - #ack each message individually and check more are received - for i in range(5): - msg = msgs.pop() - #TODO: there may be a nicer way of doing this - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") - self.assertEmpty(q) - - def test_window_flush_ack_flow(self): - """ - Test basic window based flow control with unit = bytes - """ - #declare an exclusive queue - ssn = self.session - ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True) - #create consumer - ssn.message_subscribe(queue = "q", destination = "c", - accept_mode=ssn.accept_mode.explicit) - ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c") - - #send message A - ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A")) - - for unit in ssn.credit_unit.VALUES: - ssn.message_flow("c", unit, 0xFFFFFFFFL) - - q = ssn.incoming("c") - msgA = q.get(timeout=10) - - ssn.message_flush(destination="c") - - # XXX - ssn.receiver._completed.add(msgA.id) - ssn.channel.session_completed(ssn.receiver._completed) - ssn.message_accept(RangedSet(msgA.id)) - - for unit in ssn.credit_unit.VALUES: - ssn.message_flow("c", unit, 0xFFFFFFFFL) - - #send message B - ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B")) - - msgB = q.get(timeout=10) - - def test_subscribe_not_acquired(self): - """ - Test the not-acquired modes works as expected for a simple case - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - for i in range(1, 6): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) - - session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "b") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") - - for i in range(6, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i)) - - #both subscribers should see all messages - qA = session.incoming("a") - qB = session.incoming("b") - for i in range(1, 11): - for q in [qA, qB]: - msg = q.get(timeout = 1) - self.assertEquals("Message %s" % i, msg.body) - #TODO: tidy up completion - session.receiver._completed.add(msg.id) - - #TODO: tidy up completion - session.channel.session_completed(session.receiver._completed) - #messages should still be on the queue: - self.assertEquals(10, session.queue_query(queue = "q").message_count) - - def test_acquire_with_no_accept_and_credit_flow(self): - """ - Test that messages recieved unacquired, with accept not - required in windowing mode can be acquired. - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) - - session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1) - session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - msg = session.incoming("a").get(timeout = 1) - self.assertEquals("acquire me", msg.body) - #message should still be on the queue: - self.assertEquals(1, session.queue_query(queue = "q").message_count) - - transfers = RangedSet(msg.id) - response = session.message_acquire(transfers) - #check that we get notification (i.e. message_acquired) - self.assert_(msg.id in response.transfers) - #message should have been removed from the queue: - self.assertEquals(0, session.queue_query(queue = "q").message_count) - - def test_acquire(self): - """ - Test explicit acquire function - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) - - session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - msg = session.incoming("a").get(timeout = 1) - self.assertEquals("acquire me", msg.body) - #message should still be on the queue: - self.assertEquals(1, session.queue_query(queue = "q").message_count) - - transfers = RangedSet(msg.id) - response = session.message_acquire(transfers) - #check that we get notification (i.e. message_acquired) - self.assert_(msg.id in response.transfers) - #message should have been removed from the queue: - self.assertEquals(0, session.queue_query(queue = "q").message_count) - session.message_accept(transfers) - - - def test_release(self): - """ - Test explicit release function - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me")) - - session.message_subscribe(queue = "q", destination = "a") - session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - msg = session.incoming("a").get(timeout = 1) - self.assertEquals("release me", msg.body) - session.message_cancel(destination = "a") - session.message_release(RangedSet(msg.id)) - - #message should not have been removed from the queue: - self.assertEquals(1, session.queue_query(queue = "q").message_count) - - def test_release_ordering(self): - """ - Test order of released messages is as expected - """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - for i in range (1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i))) - - session.message_subscribe(queue = "q", destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - queue = session.incoming("a") - first = queue.get(timeout = 1) - for i in range(2, 10): - msg = queue.get(timeout = 1) - self.assertEquals("released message %s" % (i), msg.body) - - last = queue.get(timeout = 1) - self.assertEmpty(queue) - released = RangedSet() - released.add(first.id, last.id) - session.message_release(released) - - #TODO: may want to clean this up... - session.receiver._completed.add(first.id, last.id) - session.channel.session_completed(session.receiver._completed) - - for i in range(1, 11): - self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body) - - def test_ranged_ack(self): - """ - Test acking of messages ranges - """ - session = self.conn.session("alternate-session", timeout=10) - - session.queue_declare(queue = "q", auto_delete=True) - delivery_properties = session.delivery_properties(routing_key="q") - for i in range (1, 11): - session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) - - session.message_subscribe(queue = "q", destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - queue = session.incoming("a") - ids = [] - for i in range (1, 11): - msg = queue.get(timeout = 1) - self.assertEquals("message %s" % (i), msg.body) - ids.append(msg.id) - - self.assertEmpty(queue) - - #ack all but the fourth message (command id 2) - accepted = RangedSet() - accepted.add(ids[0], ids[2]) - accepted.add(ids[4], ids[9]) - session.message_accept(accepted) - - #subscribe from second session here to ensure queue is not - #auto-deleted when alternate session closes (no need to ack on these): - self.session.message_subscribe(queue = "q", destination = "checker") - - #now close the session, and see that the unacked messages are - #then redelivered to another subscriber: - session.close(timeout=10) - - session = self.session - session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - queue = session.incoming("checker") - - self.assertEquals("message 4", queue.get(timeout = 1).body) - self.assertEmpty(queue) - - def test_subscribe_not_acquired_2(self): - session = self.session - - #publish some messages - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) - - #consume some of them - session.message_subscribe(queue = "q", destination = "a") - session.message_set_flow_mode(flow_mode = 0, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - - queue = session.incoming("a") - for i in range(1, 6): - msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - #complete and accept - session.message_accept(RangedSet(msg.id)) - #TODO: tidy up completion - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - self.assertEmpty(queue) - - #now create a not-acquired subscriber - session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") - - #check it gets those not consumed - queue = session.incoming("b") - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") - for i in range(6, 11): - msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - session.message_release(RangedSet(msg.id)) - #TODO: tidy up completion - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") - self.assertEmpty(queue) - - #check all 'browsed' messages are still on the queue - self.assertEqual(5, session.queue_query(queue="q").message_count) - - def test_subscribe_not_acquired_3(self): - session = self.session - - #publish some messages - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) - - #create a not-acquired subscriber - session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - - #browse through messages - queue = session.incoming("a") - for i in range(1, 11): - msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - if (i % 2): - #try to acquire every second message - response = session.message_acquire(RangedSet(msg.id)) - #check that acquire succeeds - self.assert_(msg.id in response.transfers) - session.message_accept(RangedSet(msg.id)) - else: - session.message_release(RangedSet(msg.id)) - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - self.assertEmpty(queue) - - #create a second not-acquired subscriber - session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") - #check it gets those not consumed - queue = session.incoming("b") - for i in [2,4,6,8,10]: - msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - session.message_release(RangedSet(msg.id)) - session.receiver._completed.add(msg.id) - session.channel.session_completed(session.receiver._completed) - session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b") - self.assertEmpty(queue) - - #check all 'browsed' messages are still on the queue - self.assertEqual(5, session.queue_query(queue="q").message_count) - - def test_release_unacquired(self): - session = self.session - - #create queue - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - #send message - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message")) - - #create two 'browsers' - session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - queueA = session.incoming("a") - - session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b") - queueB = session.incoming("b") - - #have each browser release the message - msgA = queueA.get(timeout = 1) - session.message_release(RangedSet(msgA.id)) - - msgB = queueB.get(timeout = 1) - session.message_release(RangedSet(msgB.id)) - - #cancel browsers - session.message_cancel(destination = "a") - session.message_cancel(destination = "b") - - #create consumer - session.message_subscribe(queue = "q", destination = "c") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c") - queueC = session.incoming("c") - #consume the message then ack it - msgC = queueC.get(timeout = 1) - session.message_accept(RangedSet(msgC.id)) - #ensure there are no other messages - self.assertEmpty(queueC) - - def test_release_order(self): - session = self.session - - #create queue - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - - #send messages - for i in range(1, 11): - session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) - - #subscribe: - session.message_subscribe(queue="q", destination="a") - a = session.incoming("a") - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - - for i in range(1, 11): - msg = a.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - if (i % 2): - #accept all odd messages - session.message_accept(RangedSet(msg.id)) - else: - #release all even messages - session.message_release(RangedSet(msg.id)) - - #browse: - session.message_subscribe(queue="q", destination="b", acquire_mode=1) - b = session.incoming("b") - b.start() - for i in [2, 4, 6, 8, 10]: - msg = b.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.body) - - - def test_empty_body(self): - session = self.session - session.queue_declare(queue="xyz", exclusive=True, auto_delete=True) - props = session.delivery_properties(routing_key="xyz") - session.message_transfer(message=Message(props, "")) - - consumer_tag = "tag1" - session.message_subscribe(queue="xyz", destination=consumer_tag) - session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) - queue = session.incoming(consumer_tag) - msg = queue.get(timeout=1) - self.assertEquals("", msg.body) - session.message_accept(RangedSet(msg.id)) - - def test_incoming_start(self): - q = "test_incoming_start" - session = self.session - - session.queue_declare(queue=q, exclusive=True, auto_delete=True) - session.message_subscribe(queue=q, destination="msgs") - messages = session.incoming("msgs") - assert messages.destination == "msgs" - - dp = session.delivery_properties(routing_key=q) - session.message_transfer(message=Message(dp, "test")) - - messages.start() - msg = messages.get() - assert msg.body == "test" - - def test_ttl(self): - q = "test_ttl" - session = self.session - - session.queue_declare(queue=q, exclusive=True, auto_delete=True) - - dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second - session.message_transfer(message=Message(dp, "first")) - - dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes - session.message_transfer(message=Message(dp, "second")) - - d = "msgs" - session.message_subscribe(queue=q, destination=d) - messages = session.incoming(d) - sleep(1) - session.message_flow(unit = session.credit_unit.message, value=2, destination=d) - session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFFL, destination=d) - assert messages.get(timeout=1).body == "second" - self.assertEmpty(messages) - - - def assertDataEquals(self, session, msg, expected): - self.assertEquals(expected, msg.body) - - def assertEmpty(self, queue): - try: - extra = queue.get(timeout=1) - self.fail("Queue not empty, contains: " + extra.body) - except Empty: None - -class SizelessContent(Content): - - def size(self): - return None diff --git a/python/tests_0-10/persistence.py b/python/tests_0-10/persistence.py deleted file mode 100644 index e9cf9b7caa..0000000000 --- a/python/tests_0-10/persistence.py +++ /dev/null @@ -1,68 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.datatypes import Message, RangedSet -#from qpid.testlib import testrunner, TestBase010 -from qpid.testlib import TestBase010 - -class PersistenceTests(TestBase010): - def test_delete_queue_after_publish(self): - session = self.session - session.auto_sync = False - - #create queue - session.queue_declare(queue = "q", auto_delete=True, durable=True) - - #send message - for i in range(1, 10): - dp = session.delivery_properties(routing_key="q", delivery_mode=2) - session.message_transfer(message=Message(dp, "my-message")) - - session.auto_sync = True - #explicitly delete queue - session.queue_delete(queue = "q") - - def test_ack_message_from_deleted_queue(self): - session = self.session - session.auto_sync = False - - #create queue - session.queue_declare(queue = "q", auto_delete=True, durable=True) - - #send message - dp = session.delivery_properties(routing_key="q", delivery_mode=2) - session.message_transfer(message=Message(dp, "my-message")) - - #create consumer - session.message_subscribe(queue = "q", destination = "a", accept_mode = 1, acquire_mode=0) - session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") - session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") - queue = session.incoming("a") - - #consume the message, cancel subscription (triggering auto-delete), then ack it - msg = queue.get(timeout = 5) - session.message_cancel(destination = "a") - session.message_accept(RangedSet(msg.id)) - - def test_queue_deletion(self): - session = self.session - session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True) - session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz") - dp = session.delivery_properties(routing_key="xyz", delivery_mode=2) - session.message_transfer(destination="amq.topic", message=Message(dp, "my-message")) - session.queue_delete(queue = "durable-subscriber-queue") diff --git a/python/tests_0-10/query.py b/python/tests_0-10/query.py deleted file mode 100644 index d57e964982..0000000000 --- a/python/tests_0-10/query.py +++ /dev/null @@ -1,247 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import TestBase010 - -class QueryTests(TestBase010): - """Tests for various query methods""" - - def test_queue_query(self): - session = self.session - session.queue_declare(queue="my-queue", exclusive=True) - result = session.queue_query(queue="my-queue") - self.assertEqual("my-queue", result.queue) - - def test_queue_query_unknown(self): - session = self.session - result = session.queue_query(queue="I don't exist") - self.assert_(not result.queue) - - def test_exchange_query(self): - """ - Test that the exchange_query method works as expected - """ - session = self.session - #check returned type for the standard exchanges - self.assertEqual("direct", session.exchange_query(name="amq.direct").type) - self.assertEqual("topic", session.exchange_query(name="amq.topic").type) - self.assertEqual("fanout", session.exchange_query(name="amq.fanout").type) - self.assertEqual("headers", session.exchange_query(name="amq.match").type) - self.assertEqual("direct", session.exchange_query(name="").type) - #declare an exchange - session.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) - #check that the result of a query is as expected - response = session.exchange_query(name="my-test-exchange") - self.assertEqual("direct", response.type) - self.assert_(not response.durable) - self.assert_(not response.not_found) - #delete the exchange - session.exchange_delete(exchange="my-test-exchange") - #check that the query now reports not-found - self.assert_(session.exchange_query(name="my-test-exchange").not_found) - - def test_exchange_bound_direct(self): - """ - Test that the exchange_bound method works as expected with the direct exchange - """ - self.exchange_bound_with_key("amq.direct") - - def test_exchange_bound_topic(self): - """ - Test that the exchange_bound method works as expected with the direct exchange - """ - self.exchange_bound_with_key("amq.topic") - - def exchange_bound_with_key(self, exchange_name): - session = self.session - #setup: create two queues - session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) - session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) - - session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key") - - # test detection of any binding to specific queue - response = session.exchange_bound(exchange=exchange_name, queue="used-queue") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.queue_not_matched) - - # test detection of specific binding to any queue - response = session.exchange_bound(exchange=exchange_name, binding_key="used-key") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.key_not_matched) - - # test detection of specific binding to specific queue - response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.queue_not_matched) - self.assert_(not response.key_not_matched) - - # test unmatched queue, unspecified binding - response = session.exchange_bound(exchange=exchange_name, queue="unused-queue") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - - # test unspecified queue, unmatched binding - response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.key_not_matched) - - # test matched queue, unmatched binding - response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.queue_not_matched) - self.assertEqual(True, response.key_not_matched) - - # test unmatched queue, matched binding - response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - self.assert_(not response.key_not_matched) - - # test unmatched queue, unmatched binding - response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - self.assertEqual(True, response.key_not_matched) - - #test exchange not found - self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) - - #test exchange found, queue not found - response = session.exchange_bound(exchange=exchange_name, queue="unknown-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(True, response.queue_not_found) - - #test exchange not found, queue found - response = session.exchange_bound(exchange="unknown-exchange", queue="used-queue") - self.assertEqual(True, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - - #test not exchange found, queue not found - response = session.exchange_bound(exchange="unknown-exchange", queue="unknown-queue") - self.assertEqual(True, response.exchange_not_found) - self.assertEqual(True, response.queue_not_found) - - - def test_exchange_bound_fanout(self): - """ - Test that the exchange_bound method works as expected with fanout exchange - """ - session = self.session - #setup - session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) - session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="amq.fanout", queue="used-queue") - - # test detection of any binding to specific queue - response = session.exchange_bound(exchange="amq.fanout", queue="used-queue") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.queue_not_matched) - - # test unmatched queue, unspecified binding - response = session.exchange_bound(exchange="amq.fanout", queue="unused-queue") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - - #test exchange not found - self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) - - #test queue not found - self.assertEqual(True, session.exchange_bound(exchange="amq.fanout", queue="unknown-queue").queue_not_found) - - def test_exchange_bound_header(self): - """ - Test that the exchange_bound method works as expected with headers exchanges - """ - session = self.session - #setup - session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True) - session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} ) - - # test detection of any binding to specific queue - response = session.exchange_bound(exchange="amq.match", queue="used-queue") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.queue_not_matched) - - # test detection of specific binding to any queue - response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "a":"A"}) - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.args_not_matched) - - # test detection of specific binding to specific queue - response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"}) - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.queue_not_matched) - self.assert_(not response.args_not_matched) - - # test unmatched queue, unspecified binding - response = session.exchange_bound(exchange="amq.match", queue="unused-queue") - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - - # test unspecified queue, unmatched binding - response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "b":"B"}) - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.args_not_matched) - - # test matched queue, unmatched binding - response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"}) - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assert_(not response.queue_not_matched) - self.assertEqual(True, response.args_not_matched) - - # test unmatched queue, matched binding - response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"}) - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - self.assert_(not response.args_not_matched) - - # test unmatched queue, unmatched binding - response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"}) - self.assert_(not response.exchange_not_found) - self.assert_(not response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - self.assertEqual(True, response.args_not_matched) - - #test exchange not found - self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found) - - #test queue not found - self.assertEqual(True, session.exchange_bound(exchange="amq.match", queue="unknown-queue").queue_not_found) - diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py deleted file mode 100644 index eb38965190..0000000000 --- a/python/tests_0-10/queue.py +++ /dev/null @@ -1,366 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.testlib import TestBase010 -from qpid.datatypes import Message -from qpid.session import SessionException - -class QueueTests(TestBase010): - """Tests for 'methods' on the amqp queue 'class'""" - - def test_purge(self): - """ - Test that the purge method removes messages from the queue - """ - session = self.session - #setup, declare a queue and add some messages to it: - session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one")) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two")) - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three")) - - #check that the queue now reports 3 messages: - session.queue_declare(queue="test-queue") - reply = session.queue_query(queue="test-queue") - self.assertEqual(3, reply.message_count) - - #now do the purge, then test that three messages are purged and the count drops to 0 - session.queue_purge(queue="test-queue"); - reply = session.queue_query(queue="test-queue") - self.assertEqual(0, reply.message_count) - - #send a further message and consume it, ensuring that the other messages are really gone - session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four")) - session.message_subscribe(queue="test-queue", destination="tag") - session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - queue = session.incoming("tag") - msg = queue.get(timeout=1) - self.assertEqual("four", msg.body) - - def test_purge_queue_exists(self): - """ - Test that the correct exception is thrown is no queue exists - for the name specified in purge - """ - session = self.session - try: - #queue specified but doesn't exist: - session.queue_purge(queue="invalid-queue") - self.fail("Expected failure when purging non-existent queue") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) #not-found - - def test_purge_empty_name(self): - """ - Test that the correct exception is thrown is no queue name - is specified for purge - """ - session = self.session - try: - #queue not specified and none previously declared for channel: - session.queue_purge() - self.fail("Expected failure when purging unspecified queue") - except SessionException, e: - self.assertEquals(531, e.args[0].error_code) #illegal-argument - - def test_declare_exclusive(self): - """ - Test that the exclusive field is honoured in queue.declare - """ - # TestBase.setUp has already opened session(1) - s1 = self.session - # Here we open a second separate connection: - s2 = self.conn.session("other") - - #declare an exclusive queue: - s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) - try: - #other connection should not be allowed to declare this: - s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) - self.fail("Expected second exclusive queue_declare to raise a channel exception") - except SessionException, e: - self.assertEquals(405, e.args[0].error_code) - - s3 = self.conn.session("subscriber") - try: - #other connection should not be allowed to declare this: - s3.message_subscribe(queue="exclusive-queue") - self.fail("Expected message_subscribe on an exclusive queue to raise a channel exception") - except SessionException, e: - self.assertEquals(405, e.args[0].error_code) - - s4 = self.conn.session("deleter") - try: - #other connection should not be allowed to declare this: - s4.queue_delete(queue="exclusive-queue") - self.fail("Expected queue_delete on an exclusive queue to raise a channel exception") - except SessionException, e: - self.assertEquals(405, e.args[0].error_code) - - - def test_declare_passive(self): - """ - Test that the passive field is honoured in queue.declare - """ - session = self.session - #declare an exclusive queue: - session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) - session.queue_declare(queue="passive-queue-1", passive=True) - try: - #other connection should not be allowed to declare this: - session.queue_declare(queue="passive-queue-2", passive=True) - self.fail("Expected passive declaration of non-existant queue to raise a channel exception") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) #not-found - - - def test_bind(self): - """ - Test various permutations of the queue.bind method - """ - session = self.session - session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) - - #straightforward case, both exchange & queue exist so no errors expected: - session.exchange_bind(queue="queue-1", exchange="amq.direct", binding_key="key1") - - #use the queue name where the routing key is not specified: - session.exchange_bind(queue="queue-1", exchange="amq.direct") - - #try and bind to non-existant exchange - try: - session.exchange_bind(queue="queue-1", exchange="an-invalid-exchange", binding_key="key1") - self.fail("Expected bind to non-existant exchange to fail") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - - def test_bind_queue_existence(self): - session = self.session - #try and bind non-existant queue: - try: - session.exchange_bind(queue="queue-2", exchange="amq.direct", binding_key="key1") - self.fail("Expected bind of non-existant queue to fail") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def test_unbind_direct(self): - self.unbind_test(exchange="amq.direct", routing_key="key") - - def test_unbind_topic(self): - self.unbind_test(exchange="amq.topic", routing_key="key") - - def test_unbind_fanout(self): - self.unbind_test(exchange="amq.fanout") - - def test_unbind_headers(self): - self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"}) - - def unbind_test(self, exchange, routing_key="", args=None, headers=None): - #bind two queues and consume from them - session = self.session - - session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) - session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) - - session.message_subscribe(queue="queue-1", destination="queue-1") - session.message_flow(destination="queue-1", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="queue-1", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - session.message_subscribe(queue="queue-2", destination="queue-2") - session.message_flow(destination="queue-2", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="queue-2", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - - queue1 = session.incoming("queue-1") - queue2 = session.incoming("queue-2") - - session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args) - session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args) - - dp = session.delivery_properties(routing_key=routing_key) - if (headers): - mp = session.message_properties(application_headers=headers) - msg1 = Message(dp, mp, "one") - msg2 = Message(dp, mp, "two") - else: - msg1 = Message(dp, "one") - msg2 = Message(dp, "two") - - #send a message that will match both bindings - session.message_transfer(destination=exchange, message=msg1) - - #unbind first queue - session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key) - - #send another message - session.message_transfer(destination=exchange, message=msg2) - - #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).body) - try: - msg = queue1.get(timeout=1) - self.fail("Got extra message: %s" % msg.body) - except Empty: pass - - self.assertEquals("one", queue2.get(timeout=1).body) - self.assertEquals("two", queue2.get(timeout=1).body) - try: - msg = queue2.get(timeout=1) - self.fail("Got extra message: " + msg) - except Empty: pass - - - def test_delete_simple(self): - """ - Test core queue deletion behaviour - """ - session = self.session - - #straight-forward case: - session.queue_declare(queue="delete-me") - session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a")) - session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b")) - session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c")) - session.queue_delete(queue="delete-me") - #check that it has gone by declaring passively - try: - session.queue_declare(queue="delete-me", passive=True) - self.fail("Queue has not been deleted") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def test_delete_queue_exists(self): - """ - Test core queue deletion behaviour - """ - #check attempted deletion of non-existant queue is handled correctly: - session = self.session - try: - session.queue_delete(queue="i-dont-exist", if_empty=True) - self.fail("Expected delete of non-existant queue to fail") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - - - def test_delete_ifempty(self): - """ - Test that if_empty field of queue_delete is honoured - """ - session = self.session - - #create a queue and add a message to it (use default binding): - session.queue_declare(queue="delete-me-2") - session.queue_declare(queue="delete-me-2", passive=True) - session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message")) - - #try to delete, but only if empty: - try: - session.queue_delete(queue="delete-me-2", if_empty=True) - self.fail("Expected delete if_empty to fail for non-empty queue") - except SessionException, e: - self.assertEquals(406, e.args[0].error_code) - - #need new session now: - session = self.conn.session("replacement", 2) - - #empty queue: - session.message_subscribe(destination="consumer_tag", queue="delete-me-2") - session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL) - queue = session.incoming("consumer_tag") - msg = queue.get(timeout=1) - self.assertEqual("message", msg.body) - session.message_cancel(destination="consumer_tag") - - #retry deletion on empty queue: - session.queue_delete(queue="delete-me-2", if_empty=True) - - #check that it has gone by declaring passively: - try: - session.queue_declare(queue="delete-me-2", passive=True) - self.fail("Queue has not been deleted") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - def test_delete_ifunused(self): - """ - Test that if_unused field of queue_delete is honoured - """ - session = self.session - - #create a queue and register a consumer: - session.queue_declare(queue="delete-me-3") - session.queue_declare(queue="delete-me-3", passive=True) - session.message_subscribe(destination="consumer_tag", queue="delete-me-3") - - #need new session now: - session2 = self.conn.session("replacement", 2) - - #try to delete, but only if empty: - try: - session2.queue_delete(queue="delete-me-3", if_unused=True) - self.fail("Expected delete if_unused to fail for queue with existing consumer") - except SessionException, e: - self.assertEquals(406, e.args[0].error_code) - - session.message_cancel(destination="consumer_tag") - session.queue_delete(queue="delete-me-3", if_unused=True) - #check that it has gone by declaring passively: - try: - session.queue_declare(queue="delete-me-3", passive=True) - self.fail("Queue has not been deleted") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - - def test_autodelete_shared(self): - """ - Test auto-deletion (of non-exclusive queues) - """ - session = self.session - session2 =self.conn.session("other", 1) - - session.queue_declare(queue="auto-delete-me", auto_delete=True) - - #consume from both sessions - tag = "my-tag" - session.message_subscribe(queue="auto-delete-me", destination=tag) - session2.message_subscribe(queue="auto-delete-me", destination=tag) - - #implicit cancel - session2.close() - - #check it is still there - session.queue_declare(queue="auto-delete-me", passive=True) - - #explicit cancel => queue is now unused again: - session.message_cancel(destination=tag) - - #NOTE: this assumes there is no timeout in use - - #check that it has gone by declaring it passively - try: - session.queue_declare(queue="auto-delete-me", passive=True) - self.fail("Expected queue to have been deleted") - except SessionException, e: - self.assertEquals(404, e.args[0].error_code) - - diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py deleted file mode 100644 index 8cdc539a08..0000000000 --- a/python/tests_0-10/tx.py +++ /dev/null @@ -1,265 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.datatypes import Message, RangedSet -from qpid.testlib import TestBase010 - -class TxTests(TestBase010): - """ - Tests for 'methods' on the amqp tx 'class' - """ - - def test_commit(self): - """ - Test that commited publishes are delivered and commited acks are not re-delivered - """ - session = self.session - - #declare queues and create subscribers in the checking session - #to ensure that the queues are not auto-deleted too early: - self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"]) - session.message_subscribe(queue="tx-commit-a", destination="qa") - session.message_subscribe(queue="tx-commit-b", destination="qb") - session.message_subscribe(queue="tx-commit-c", destination="qc") - - #use a separate session for actual work - session2 = self.conn.session("worker", 2) - self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c") - session2.tx_commit() - session2.close() - - session.tx_select() - - self.enable_flow("qa") - queue_a = session.incoming("qa") - - self.enable_flow("qb") - queue_b = session.incoming("qb") - - self.enable_flow("qc") - queue_c = session.incoming("qc") - - #check results - for i in range(1, 5): - msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.body) - session.message_accept(RangedSet(msg.id)) - - msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.body) - session.message_accept(RangedSet(msg.id)) - - msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.body) - session.message_accept(RangedSet(msg.id)) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - session.tx_commit() - - def test_auto_rollback(self): - """ - Test that a session closed with an open transaction is effectively rolled back - """ - session = self.session - self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"]) - session.message_subscribe(queue="tx-autorollback-a", destination="qa") - session.message_subscribe(queue="tx-autorollback-b", destination="qb") - session.message_subscribe(queue="tx-autorollback-c", destination="qc") - - session2 = self.conn.session("worker", 2) - queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - session2.close() - - session.tx_select() - - self.enable_flow("qa") - queue_a = session.incoming("qa") - - self.enable_flow("qb") - queue_b = session.incoming("qb") - - self.enable_flow("qc") - queue_c = session.incoming("qc") - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - session.message_accept(RangedSet(msg.id)) - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - session.message_accept(RangedSet(msg.id)) - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - session.message_accept(RangedSet(msg.id)) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - session.tx_commit() - - def test_rollback(self): - """ - Test that rolled back publishes are not delivered and rolled back acks are re-delivered - """ - session = self.session - queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - session.tx_rollback() - - #need to release messages to get them redelivered now: - session.message_release(consumed) - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - session.message_accept(RangedSet(msg.id)) - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - session.message_accept(RangedSet(msg.id)) - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - session.message_accept(RangedSet(msg.id)) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - session.tx_commit() - - def perform_txn_work(self, session, name_a, name_b, name_c): - """ - Utility method that does some setup and some work under a transaction. Used for testing both - commit and rollback - """ - #setup: - self.declare_queues([name_a, name_b, name_c]) - - key = "my_key_" + name_b - topic = "my_topic_" + name_c - - session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key) - session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic) - - dp = session.delivery_properties(routing_key=name_a) - for i in range(1, 5): - mp = session.message_properties(message_id="msg%d" % i) - session.message_transfer(message=Message(dp, mp, "Message %d" % i)) - - dp = session.delivery_properties(routing_key=key) - mp = session.message_properties(message_id="msg6") - session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6")) - - dp = session.delivery_properties(routing_key=topic) - mp = session.message_properties(message_id="msg7") - session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7")) - - session.tx_select() - - #consume and ack messages - acked = RangedSet() - self.subscribe(session, queue=name_a, destination="sub_a") - queue_a = session.incoming("sub_a") - for i in range(1, 5): - msg = queue_a.get(timeout=1) - acked.add(msg.id) - self.assertEqual("Message %d" % i, msg.body) - - self.subscribe(session, queue=name_b, destination="sub_b") - queue_b = session.incoming("sub_b") - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - acked.add(msg.id) - - sub_c = self.subscribe(session, queue=name_c, destination="sub_c") - queue_c = session.incoming("sub_c") - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - acked.add(msg.id) - - session.message_accept(acked) - - dp = session.delivery_properties(routing_key=topic) - #publish messages - for i in range(1, 5): - mp = session.message_properties(message_id="tx-msg%d" % i) - session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i)) - - dp = session.delivery_properties(routing_key=key) - mp = session.message_properties(message_id="tx-msg6") - session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6")) - - dp = session.delivery_properties(routing_key=name_a) - mp = session.message_properties(message_id="tx-msg7") - session.message_transfer(message=Message(dp, mp, "TxMessage 7")) - return queue_a, queue_b, queue_c, acked - - def declare_queues(self, names, session=None): - session = session or self.session - for n in names: - session.queue_declare(queue=n, auto_delete=True) - - def subscribe(self, session=None, **keys): - session = session or self.session - consumer_tag = keys["destination"] - session.message_subscribe(**keys) - session.message_flow(destination=consumer_tag, unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination=consumer_tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) - - def enable_flow(self, tag, session=None): - session = session or self.session - session.message_flow(destination=tag, unit=session.credit_unit.message, value=0xFFFFFFFFL) - session.message_flow(destination=tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL) - - def complete(self, session, msg): - session.receiver._completed.add(msg.id)#TODO: this may be done automatically - session.channel.session_completed(session.receiver._completed) - diff --git a/python/tests_0-8/__init__.py b/python/tests_0-8/__init__.py deleted file mode 100644 index 526f2452f8..0000000000 --- a/python/tests_0-8/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import basic, broker, example, exchange, queue, testlib, tx diff --git a/python/tests_0-8/basic.py b/python/tests_0-8/basic.py deleted file mode 100644 index d5837fc19c..0000000000 --- a/python/tests_0-8/basic.py +++ /dev/null @@ -1,396 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import TestBase - -class BasicTests(TestBase): - """Tests for 'methods' on the amqp basic 'class'""" - - def test_consume_no_local(self): - """ - Test that the no_local flag is honoured in the consume method - """ - channel = self.channel - #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True) - #establish two consumers one of which excludes delivery of locally sent messages - channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") - channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) - - #send a message - channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) - channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) - - #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") - msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.content.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test that the exclusive flag is honoured in the consume method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True) - - #check that an exclusive consumer prevents other consumer being created: - channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.channel_open() - - #check that an exclusive consumer cannot be created if a consumer already exists: - channel.basic_consume(consumer_tag="first", queue="test-queue-2") - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - def test_consume_queue_errors(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - channel = self.channel - try: - #queue specified but doesn't exist: - channel.basic_consume(queue="invalid-queue") - self.fail("Expected failure when consuming from non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(2) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.basic_consume(queue="") - self.fail("Expected failure when consuming from unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True) - - #check that attempts to use duplicate tags are detected and prevented: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - try: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") - channel.basic_publish(routing_key="test-queue-4", content=Content("One")) - - myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.content.body) - - #cancel should stop messages being delivered - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_cancel(consumer_tag="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True) - - reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) - queue = self.client.queue(reply.consumer_tag) - - channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - def test_recover_requeue(self): - """ - Test requeing on recovery - """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True) - - subscription = channel.basic_consume(queue="test-requeue", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - channel.basic_publish(routing_key="test-requeue", content=Content("One")) - channel.basic_publish(routing_key="test-requeue", content=Content("Two")) - channel.basic_publish(routing_key="test-requeue", content=Content("Three")) - channel.basic_publish(routing_key="test-requeue", content=Content("Four")) - channel.basic_publish(routing_key="test-requeue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_cancel(consumer_tag=subscription.consumer_tag) - - channel.basic_recover(requeue=True) - - subscription2 = channel.basic_consume(queue="test-requeue") - queue2 = self.client.queue(subscription2.consumer_tag) - - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) - - try: - extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.content.body) - except Empty: None - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.content.body) - except Empty: None - - - def test_qos_prefetch_count(self): - """ - Test that the prefetch count specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 5: - channel.basic_qos(prefetch_count=5) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered: - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - - - def test_qos_prefetch_size(self): - """ - Test that the prefetch size specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.basic_qos(prefetch_size=50) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered (i.e. 45 bytes worth): - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - #make sure that a single oversized message still gets delivered - large = "abcdefghijklmnopqrstuvwxyz" - large = large + "-" + large; - channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) - msg = queue.get(timeout=1) - self.assertEqual(large, msg.content.body) - - def test_get(self): - """ - Test basic_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - #use basic_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #repeat for no_ack=False - for i in range(11, 21): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - for i in range(11, 21): - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - if(i == 13): - channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) - if(i in [15, 17, 19]): - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #recover(requeue=True) - channel.basic_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - channel.basic_recover(requeue=True) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") diff --git a/python/tests_0-8/broker.py b/python/tests_0-8/broker.py deleted file mode 100644 index 7f3fe7530e..0000000000 --- a/python/tests_0-8/broker.py +++ /dev/null @@ -1,120 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import TestBase - -class BrokerTests(TestBase): - """Tests for basic Broker functionality""" - - def test_ack_and_no_ack(self): - """ - First, this test tries to receive a message with a no-ack - consumer. Second, this test tries to explicitly receive and - acknowledge a message with an acknowledging consumer. - """ - ch = self.channel - self.queue_declare(ch, queue = "myqueue") - - # No ack consumer - ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag - body = "test no-ack" - ch.basic_publish(routing_key = "myqueue", content = Content(body)) - msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.content.body == body) - - # Acknowledging consumer - self.queue_declare(ch, queue = "otherqueue") - ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag - body = "test ack" - ch.basic_publish(routing_key = "otherqueue", content = Content(body)) - msg = self.client.queue(ctag).get(timeout = 5) - ch.basic_ack(delivery_tag = msg.delivery_tag) - self.assert_(msg.content.body == body) - - def test_basic_delivery_immediate(self): - """ - Test basic message delivery where consume is issued before publish - """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) - - body = "Immediate Delivery" - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) - msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) - - # TODO: Ensure we fail if immediate=True and there's no consumer. - - - def test_basic_delivery_queued(self): - """ - Test basic message delivery where publish is issued before consume - (i.e. requires queueing of the message) - """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - body = "Queued Delivery" - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) - - def test_invalid_channel(self): - channel = self.client.channel(200) - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for invalid channel") - except Closed, e: - self.assertConnectionException(504, e.args[0]) - - def test_closed_channel(self): - channel = self.client.channel(200) - channel.channel_open() - channel.channel_close() - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for closed channel") - except Closed, e: - self.assertConnectionException(504, e.args[0]) - - def test_channel_flow(self): - channel = self.channel - channel.queue_declare(queue="flow_test_queue", exclusive=True) - ctag = channel.basic_consume(queue="flow_test_queue", no_ack=True).consumer_tag - incoming = self.client.queue(ctag) - - channel.channel_flow(active=False) - channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz")) - try: - incoming.get(timeout=1) - self.fail("Received message when flow turned off.") - except Empty: None - - channel.channel_flow(active=True) - msg = incoming.get(timeout=1) - self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) diff --git a/python/tests_0-8/example.py b/python/tests_0-8/example.py deleted file mode 100644 index d82bad1f61..0000000000 --- a/python/tests_0-8/example.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.content import Content -from qpid.testlib import TestBase - -class ExampleTest (TestBase): - """ - An example Qpid test, illustrating the unittest frameowkr and the - python Qpid client. The test class must inherit TestCase. The - test code uses the Qpid client to interact with a qpid broker and - verify it behaves as expected. - """ - - def test_example(self): - """ - An example test. Note that test functions must start with 'test_' - to be recognized by the test framework. - """ - - # By inheriting TestBase, self.client is automatically connected - # and self.channel is automatically opened as channel(1) - # Other channel methods mimic the protocol. - channel = self.channel - - # Now we can send regular commands. If you want to see what the method - # arguments mean or what other commands are available, you can use the - # python builtin help() method. For example: - #help(chan) - #help(chan.exchange_declare) - - # If you want browse the available protocol methods without being - # connected to a live server you can use the amqp-doc utility: - # - # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] - # - # Options: - # -e, --regexp use regex instead of glob when matching - - # Now that we know what commands are available we can use them to - # interact with the server. - - # Here we use ordinal arguments. - self.exchange_declare(channel, 0, "test", "direct") - - # Here we use keyword arguments. - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") - - # Call Channel.basic_consume to register as a consumer. - # All the protocol methods return a message object. The message object - # has fields corresponding to the reply method fields, plus a content - # field that is filled if the reply includes content. In this case the - # interesting field is the consumer_tag. - reply = channel.basic_consume(queue="test-queue") - - # We can use the Client.queue(...) method to access the queue - # corresponding to our consumer_tag. - queue = self.client.queue(reply.consumer_tag) - - # Now lets publish a message and see if our consumer gets it. To do - # this we need to import the Content class. - body = "Hello World!" - channel.basic_publish(exchange="test", - routing_key="key", - content=Content(body)) - - # Now we'll wait for the message to arrive. We can use the timeout - # argument in case the server hangs. By default queue.get() will wait - # until a message arrives or the connection to the server dies. - msg = queue.get(timeout=10) - - # And check that we got the right response with assertEqual - self.assertEqual(body, msg.content.body) - - # Now acknowledge the message. - channel.basic_ack(msg.delivery_tag, True) - diff --git a/python/tests_0-8/exchange.py b/python/tests_0-8/exchange.py deleted file mode 100644 index 56d6fa82e4..0000000000 --- a/python/tests_0-8/exchange.py +++ /dev/null @@ -1,327 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Tests for exchange behaviour. - -Test classes ending in 'RuleTests' are derived from rules in amqp.xml. -""" - -import Queue, logging -from qpid.testlib import TestBase -from qpid.content import Content -from qpid.client import Closed - - -class StandardExchangeVerifier: - """Verifies standard exchange behavior. - - Used as base class for classes that test standard exchanges.""" - - def verifyDirectExchange(self, ex): - """Verify that ex behaves like a direct exchange.""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") - self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") - try: - self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") - self.fail("Expected Empty exception") - except Queue.Empty: None # Expected - - def verifyFanOutExchange(self, ex): - """Verify that ex behaves like a fanout exchange.""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex) - self.queue_declare(queue="p") - self.channel.queue_bind(queue="p", exchange=ex) - for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) - - def verifyTopicExchange(self, ex): - """Verify that ex behaves like a topic exchange""" - self.queue_declare(queue="a") - self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") - q = self.consume("a") - self.assertPublishGet(q, ex, "a.b.x") - self.assertPublishGet(q, ex, "a.x.b.x") - self.assertPublishGet(q, ex, "a.x.x.b.x") - # Shouldn't match - self.channel.basic_publish(exchange=ex, routing_key="a.b") - self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") - self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x") - self.channel.basic_publish(exchange=ex, routing_key="a.b") - self.assert_(q.empty()) - - def verifyHeadersExchange(self, ex): - """Verify that ex is a headers exchange""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) - q = self.consume("q") - headers = {"name":"fred", "age":3} - self.assertPublishGet(q, exchange=ex, properties={'headers':headers}) - self.channel.basic_publish(exchange=ex) # No headers, won't deliver - self.assertEmpty(q); - - -class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): - """ - The server SHOULD implement these standard exchange types: topic, headers. - - Client attempts to declare an exchange with each of these standard types. - """ - - def testDirect(self): - """Declare and test a direct exchange""" - self.exchange_declare(0, exchange="d", type="direct") - self.verifyDirectExchange("d") - - def testFanout(self): - """Declare and test a fanout exchange""" - self.exchange_declare(0, exchange="f", type="fanout") - self.verifyFanOutExchange("f") - - def testTopic(self): - """Declare and test a topic exchange""" - self.exchange_declare(0, exchange="t", type="topic") - self.verifyTopicExchange("t") - - def testHeaders(self): - """Declare and test a headers exchange""" - self.exchange_declare(0, exchange="h", type="headers") - self.verifyHeadersExchange("h") - - -class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): - """ - The server MUST, in each virtual host, pre-declare an exchange instance - for each standard exchange type that it implements, where the name of the - exchange instance is amq. followed by the exchange type name. - - Client creates a temporary queue and attempts to bind to each required - exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if - those types are defined). - """ - def testAmqDirect(self): self.verifyDirectExchange("amq.direct") - - def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") - - def testAmqTopic(self): self.verifyTopicExchange("amq.topic") - - def testAmqMatch(self): self.verifyHeadersExchange("amq.match") - -class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): - """ - The server MUST predeclare a direct exchange to act as the default exchange - for content Publish methods and for default queue bindings. - - Client checks that the default exchange is active by specifying a queue - binding with no exchange name, and publishing a message with a suitable - routing key but without specifying the exchange name, then ensuring that - the message arrives in the queue correctly. - """ - def testDefaultExchange(self): - # Test automatic binding by queue name. - self.queue_declare(queue="d") - self.assertPublishConsume(queue="d", routing_key="d") - # Test explicit bind to default queue - self.verifyDirectExchange("") - - -# TODO aconway 2006-09-27: Fill in empty tests: - -class DefaultAccessRuleTests(TestBase): - """ - The server MUST NOT allow clients to access the default exchange except - by specifying an empty exchange name in the Queue.Bind and content Publish - methods. - """ - -class ExtensionsRuleTests(TestBase): - """ - The server MAY implement other exchange types as wanted. - """ - - -class DeclareMethodMinimumRuleTests(TestBase): - """ - The server SHOULD support a minimum of 16 exchanges per virtual host and - ideally, impose no limit except as defined by available resources. - - The client creates as many exchanges as it can until the server reports - an error; the number of exchanges successfuly created must be at least - sixteen. - """ - - -class DeclareMethodTicketFieldValidityRuleTests(TestBase): - """ - The client MUST provide a valid access ticket giving "active" access to - the realm in which the exchange exists or will be created, or "passive" - access if the if-exists flag is set. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeclareMethodExchangeFieldReservedRuleTests(TestBase): - """ - Exchange names starting with "amq." are reserved for predeclared and - standardised exchanges. The client MUST NOT attempt to create an exchange - starting with "amq.". - - - """ - - -class DeclareMethodTypeFieldTypedRuleTests(TestBase): - """ - Exchanges cannot be redeclared with different types. The client MUST not - attempt to redeclare an existing exchange with a different type than used - in the original Exchange.Declare method. - - - """ - - -class DeclareMethodTypeFieldSupportRuleTests(TestBase): - """ - The client MUST NOT attempt to create an exchange with a type that the - server does not support. - - - """ - - -class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): - """ - If set, and the exchange does not already exist, the server MUST raise a - channel exception with reply code 404 (not found). - """ - def test(self): - try: - self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) - self.fail("Expected 404 for passive declaration of unknown exchange.") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - -class DeclareMethodDurableFieldSupportRuleTests(TestBase): - """ - The server MUST support both durable and transient exchanges. - - - """ - - -class DeclareMethodDurableFieldStickyRuleTests(TestBase): - """ - The server MUST ignore the durable field if the exchange already exists. - - - """ - - -class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): - """ - The server MUST ignore the auto-delete field if the exchange already - exists. - - - """ - - -class DeleteMethodTicketFieldValidityRuleTests(TestBase): - """ - The client MUST provide a valid access ticket giving "active" access - rights to the exchange's access realm. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeleteMethodExchangeFieldExistsRuleTests(TestBase): - """ - The client MUST NOT attempt to delete an exchange that does not exist. - """ - - -class HeadersExchangeTests(TestBase): - """ - Tests for headers exchange functionality. - """ - def setUp(self): - TestBase.setUp(self) - self.queue_declare(queue="q") - self.q = self.consume("q") - - def myAssertPublishGet(self, headers): - self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers}) - - def myBasicPublish(self, headers): - self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers})) - - def testMatchAll(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) - - # None of these should match - self.myBasicPublish({}) - self.myBasicPublish({"name":"barney"}) - self.myBasicPublish({"name":10}) - self.myBasicPublish({"name":"fred", "age":2}) - self.assertEmpty(self.q) - - def testMatchAny(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred"}) - self.myAssertPublishGet({"name":"fred", "ignoreme":10}) - self.myAssertPublishGet({"ignoreme":10, "age":3}) - - # Wont match - self.myBasicPublish({}) - self.myBasicPublish({"irrelevant":0}) - self.assertEmpty(self.q) - - -class MiscellaneousErrorsTests(TestBase): - """ - Test some miscellaneous error conditions - """ - def testTypeNotKnown(self): - try: - self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") - self.fail("Expected 503 for declaration of unknown exchange type.") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def testDifferentDeclaredType(self): - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") - try: - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") - self.fail("Expected 530 for redeclaration of exchange with different type.") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - #cleanup - other = self.connect() - c2 = other.channel(1) - c2.channel_open() - c2.exchange_delete(exchange="test_different_declared_type_exchange") - diff --git a/python/tests_0-8/queue.py b/python/tests_0-8/queue.py deleted file mode 100644 index b7a41736ab..0000000000 --- a/python/tests_0-8/queue.py +++ /dev/null @@ -1,255 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import TestBase - -class QueueTests(TestBase): - """Tests for 'methods' on the amqp queue 'class'""" - - def test_purge(self): - """ - Test that the purge method removes messages from the queue - """ - channel = self.channel - #setup, declare a queue and add some messages to it: - channel.exchange_declare(exchange="test-exchange", type="direct") - channel.queue_declare(queue="test-queue", exclusive=True) - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one")) - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two")) - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three")) - - #check that the queue now reports 3 messages: - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(3, reply.message_count) - - #now do the purge, then test that three messages are purged and the count drops to 0 - reply = channel.queue_purge(queue="test-queue"); - self.assertEqual(3, reply.message_count) - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(0, reply.message_count) - - #send a further message and consume it, ensuring that the other messages are really gone - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four")) - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=1) - self.assertEqual("four", msg.content.body) - - #check error conditions (use new channels): - channel = self.client.channel(2) - channel.channel_open() - try: - #queue specified but doesn't exist: - channel.queue_purge(queue="invalid-queue") - self.fail("Expected failure when purging non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(3) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.queue_purge() - self.fail("Expected failure when purging unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - #cleanup - other = self.connect() - channel = other.channel(1) - channel.channel_open() - channel.exchange_delete(exchange="test-exchange") - - def test_declare_exclusive(self): - """ - Test that the exclusive field is honoured in queue.declare - """ - # TestBase.setUp has already opened channel(1) - c1 = self.channel - # Here we open a second separate connection: - other = self.connect() - c2 = other.channel(1) - c2.channel_open() - - #declare an exclusive queue: - c1.queue_declare(queue="exclusive-queue", exclusive="True") - try: - #other connection should not be allowed to declare this: - c2.queue_declare(queue="exclusive-queue", exclusive="True") - self.fail("Expected second exclusive queue_declare to raise a channel exception") - except Closed, e: - self.assertChannelException(405, e.args[0]) - - - def test_declare_passive(self): - """ - Test that the passive field is honoured in queue.declare - """ - channel = self.channel - #declare an exclusive queue: - channel.queue_declare(queue="passive-queue-1", exclusive="True") - channel.queue_declare(queue="passive-queue-1", passive="True") - try: - #other connection should not be allowed to declare this: - channel.queue_declare(queue="passive-queue-2", passive="True") - self.fail("Expected passive declaration of non-existant queue to raise a channel exception") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - def test_bind(self): - """ - Test various permutations of the queue.bind method - """ - channel = self.channel - channel.queue_declare(queue="queue-1", exclusive="True") - - #straightforward case, both exchange & queue exist so no errors expected: - channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") - - #bind the default queue for the channel (i.e. last one declared): - channel.queue_bind(exchange="amq.direct", routing_key="key2") - - #use the queue name where neither routing key nor queue are specified: - channel.queue_bind(exchange="amq.direct") - - #try and bind to non-existant exchange - try: - channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") - self.fail("Expected bind to non-existant exchange to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - #need to reopen a channel: - channel = self.client.channel(2) - channel.channel_open() - - #try and bind non-existant queue: - try: - channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") - self.fail("Expected bind of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - def test_delete_simple(self): - """ - Test basic queue deletion - """ - channel = self.channel - - #straight-forward case: - channel.queue_declare(queue="delete-me") - channel.basic_publish(routing_key="delete-me", content=Content("a")) - channel.basic_publish(routing_key="delete-me", content=Content("b")) - channel.basic_publish(routing_key="delete-me", content=Content("c")) - reply = channel.queue_delete(queue="delete-me") - self.assertEqual(3, reply.message_count) - #check that it has gone be declaring passively - try: - channel.queue_declare(queue="delete-me", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - #check attempted deletion of non-existant queue is handled correctly: - channel = self.client.channel(2) - channel.channel_open() - try: - channel.queue_delete(queue="i-dont-exist", if_empty="True") - self.fail("Expected delete of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - - def test_delete_ifempty(self): - """ - Test that if_empty field of queue_delete is honoured - """ - channel = self.channel - - #create a queue and add a message to it (use default binding): - channel.queue_declare(queue="delete-me-2") - channel.queue_declare(queue="delete-me-2", passive="True") - channel.basic_publish(routing_key="delete-me-2", content=Content("message")) - - #try to delete, but only if empty: - try: - channel.queue_delete(queue="delete-me-2", if_empty="True") - self.fail("Expected delete if_empty to fail for non-empty queue") - except Closed, e: - self.assertChannelException(406, e.args[0]) - - #need new channel now: - channel = self.client.channel(2) - channel.channel_open() - - #empty queue: - reply = channel.basic_consume(queue="delete-me-2", no_ack=True) - queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=1) - self.assertEqual("message", msg.content.body) - channel.basic_cancel(consumer_tag=reply.consumer_tag) - - #retry deletion on empty queue: - channel.queue_delete(queue="delete-me-2", if_empty="True") - - #check that it has gone by declaring passively: - try: - channel.queue_declare(queue="delete-me-2", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - def test_delete_ifunused(self): - """ - Test that if_unused field of queue_delete is honoured - """ - channel = self.channel - - #create a queue and register a consumer: - channel.queue_declare(queue="delete-me-3") - channel.queue_declare(queue="delete-me-3", passive="True") - reply = channel.basic_consume(queue="delete-me-3", no_ack=True) - - #need new channel now: - channel2 = self.client.channel(2) - channel2.channel_open() - #try to delete, but only if empty: - try: - channel2.queue_delete(queue="delete-me-3", if_unused="True") - self.fail("Expected delete if_unused to fail for queue with existing consumer") - except Closed, e: - self.assertChannelException(406, e.args[0]) - - - channel.basic_cancel(consumer_tag=reply.consumer_tag) - channel.queue_delete(queue="delete-me-3", if_unused="True") - #check that it has gone by declaring passively: - try: - channel.queue_declare(queue="delete-me-3", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - diff --git a/python/tests_0-8/testlib.py b/python/tests_0-8/testlib.py deleted file mode 100644 index 76f7e964a2..0000000000 --- a/python/tests_0-8/testlib.py +++ /dev/null @@ -1,66 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Tests for the testlib itself. -# - -from qpid.content import Content -from qpid.testlib import TestBase -from Queue import Empty - -import sys -from traceback import * - -def mytrace(frame, event, arg): - print_stack(frame); - print "====" - return mytrace - -class TestBaseTest(TestBase): - """Verify TestBase functions work as expected""" - - def testAssertEmptyPass(self): - """Test assert empty works""" - self.queue_declare(queue="empty") - q = self.consume("empty") - self.assertEmpty(q) - try: - q.get(timeout=1) - self.fail("Queue is not empty.") - except Empty: None # Ignore - - def testAssertEmptyFail(self): - self.queue_declare(queue="full") - q = self.consume("full") - self.channel.basic_publish(routing_key="full") - try: - self.assertEmpty(q); - self.fail("assertEmpty did not assert on non-empty queue") - except AssertionError: None # Ignore - - def testMessageProperties(self): - """Verify properties are passed with message""" - props={"headers":{"x":1, "y":2}} - self.queue_declare(queue="q") - q = self.consume("q") - self.assertPublishGet(q, routing_key="q", properties=props) - - - diff --git a/python/tests_0-8/tx.py b/python/tests_0-8/tx.py deleted file mode 100644 index 9faddb1110..0000000000 --- a/python/tests_0-8/tx.py +++ /dev/null @@ -1,209 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import TestBase - -class TxTests(TestBase): - """ - Tests for 'methods' on the amqp tx 'class' - """ - - def test_commit(self): - """ - Test that commited publishes are delivered and commited acks are not re-delivered - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel.tx_commit() - - #check results - for i in range(1, 5): - msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.content.body) - - msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.content.body) - - msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.content.body) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) - channel.tx_commit() - - def test_auto_rollback(self): - """ - Test that a channel closed with an open transaction is effectively rolled back - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - channel.tx_rollback() - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) - channel.tx_commit() - - def test_rollback(self): - """ - Test that rolled back publishes are not delivered and rolled back acks are re-delivered - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - channel.tx_rollback() - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) - channel.tx_commit() - - def perform_txn_work(self, channel, name_a, name_b, name_c): - """ - Utility method that does some setup and some work under a transaction. Used for testing both - commit and rollback - """ - #setup: - channel.queue_declare(queue=name_a, exclusive=True) - channel.queue_declare(queue=name_b, exclusive=True) - channel.queue_declare(queue=name_c, exclusive=True) - - key = "my_key_" + name_b - topic = "my_topic_" + name_c - - channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) - channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) - - for i in range(1, 5): - channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i)) - - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6")) - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7")) - - channel.tx_select() - - #consume and ack messages - sub_a = channel.basic_consume(queue=name_a, no_ack=False) - queue_a = self.client.queue(sub_a.consumer_tag) - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - sub_b = channel.basic_consume(queue=name_b, no_ack=False) - queue_b = self.client.queue(sub_b.consumer_tag) - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) - - sub_c = channel.basic_consume(queue=name_c, no_ack=False) - queue_c = self.client.queue(sub_c.consumer_tag) - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) - - #publish messages - for i in range(1, 5): - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i)) - - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6")) - channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7")) - - return queue_a, queue_b, queue_c - - def test_commit_overlapping_acks(self): - """ - Test that logically 'overlapping' acks do not cause errors on commit - """ - channel = self.channel - channel.queue_declare(queue="commit-overlapping", exclusive=True) - for i in range(1, 10): - channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i)) - - - channel.tx_select() - - sub = channel.basic_consume(queue="commit-overlapping", no_ack=False) - queue = self.client.queue(sub.consumer_tag) - for i in range(1, 10): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - if i in [3, 6, 10]: - channel.basic_ack(delivery_tag=msg.delivery_tag) - - channel.tx_commit() - - #check all have been acked: - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None diff --git a/python/tests_0-9/__init__.py b/python/tests_0-9/__init__.py deleted file mode 100644 index d9f2ed7dbb..0000000000 --- a/python/tests_0-9/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import query, queue diff --git a/python/tests_0-9/query.py b/python/tests_0-9/query.py deleted file mode 100644 index cb66d079e5..0000000000 --- a/python/tests_0-9/query.py +++ /dev/null @@ -1,224 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import TestBase - -class QueryTests(TestBase): - """Tests for various query methods introduced in 0-10 and available in 0-9 for preview""" - - def test_exchange_query(self): - """ - Test that the exchange_query method works as expected - """ - channel = self.channel - #check returned type for the standard exchanges - self.assertEqual("direct", channel.exchange_query(name="amq.direct").type) - self.assertEqual("topic", channel.exchange_query(name="amq.topic").type) - self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type) - self.assertEqual("headers", channel.exchange_query(name="amq.match").type) - self.assertEqual("direct", channel.exchange_query(name="").type) - #declare an exchange - channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) - #check that the result of a query is as expected - response = channel.exchange_query(name="my-test-exchange") - self.assertEqual("direct", response.type) - self.assertEqual(False, response.durable) - self.assertEqual(False, response.not_found) - #delete the exchange - channel.exchange_delete(exchange="my-test-exchange") - #check that the query now reports not-found - self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found) - - def test_binding_query_direct(self): - """ - Test that the binding_query method works as expected with the direct exchange - """ - self.binding_query_with_key("amq.direct") - - def test_binding_query_topic(self): - """ - Test that the binding_query method works as expected with the direct exchange - """ - self.binding_query_with_key("amq.topic") - - def binding_query_with_key(self, exchange_name): - channel = self.channel - #setup: create two queues - channel.queue_declare(queue="used-queue", exclusive=True) - channel.queue_declare(queue="unused-queue", exclusive=True) - - channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key") - - # test detection of any binding to specific queue - response = channel.binding_query(exchange=exchange_name, queue="used-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - - # test detection of specific binding to any queue - response = channel.binding_query(exchange=exchange_name, routing_key="used-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.key_not_matched) - - # test detection of specific binding to specific queue - response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - self.assertEqual(False, response.key_not_matched) - - # test unmatched queue, unspecified binding - response = channel.binding_query(exchange=exchange_name, queue="unused-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - - # test unspecified queue, unmatched binding - response = channel.binding_query(exchange=exchange_name, routing_key="unused-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.key_not_matched) - - # test matched queue, unmatched binding - response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - self.assertEqual(True, response.key_not_matched) - - # test unmatched queue, matched binding - response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - self.assertEqual(False, response.key_not_matched) - - # test unmatched queue, unmatched binding - response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - self.assertEqual(True, response.key_not_matched) - - #test exchange not found - self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) - - #test queue not found - self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found) - - - def test_binding_query_fanout(self): - """ - Test that the binding_query method works as expected with fanout exchange - """ - channel = self.channel - #setup - channel.queue_declare(queue="used-queue", exclusive=True) - channel.queue_declare(queue="unused-queue", exclusive=True) - channel.queue_bind(exchange="amq.fanout", queue="used-queue") - - # test detection of any binding to specific queue - response = channel.binding_query(exchange="amq.fanout", queue="used-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - - # test unmatched queue, unspecified binding - response = channel.binding_query(exchange="amq.fanout", queue="unused-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - - #test exchange not found - self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) - - #test queue not found - self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found) - - def test_binding_query_header(self): - """ - Test that the binding_query method works as expected with headers exchanges - """ - channel = self.channel - #setup - channel.queue_declare(queue="used-queue", exclusive=True) - channel.queue_declare(queue="unused-queue", exclusive=True) - channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} ) - - # test detection of any binding to specific queue - response = channel.binding_query(exchange="amq.match", queue="used-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - - # test detection of specific binding to any queue - response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.args_not_matched) - - # test detection of specific binding to specific queue - response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - self.assertEqual(False, response.args_not_matched) - - # test unmatched queue, unspecified binding - response = channel.binding_query(exchange="amq.match", queue="unused-queue") - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - - # test unspecified queue, unmatched binding - response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.args_not_matched) - - # test matched queue, unmatched binding - response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(False, response.queue_not_matched) - self.assertEqual(True, response.args_not_matched) - - # test unmatched queue, matched binding - response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - self.assertEqual(False, response.args_not_matched) - - # test unmatched queue, unmatched binding - response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"}) - self.assertEqual(False, response.exchange_not_found) - self.assertEqual(False, response.queue_not_found) - self.assertEqual(True, response.queue_not_matched) - self.assertEqual(True, response.args_not_matched) - - #test exchange not found - self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found) - - #test queue not found - self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found) - diff --git a/python/tests_0-9/queue.py b/python/tests_0-9/queue.py deleted file mode 100644 index de1153307c..0000000000 --- a/python/tests_0-9/queue.py +++ /dev/null @@ -1,111 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import TestBase - -class QueueTests(TestBase): - """Tests for 'methods' on the amqp queue 'class'""" - - def test_unbind_direct(self): - self.unbind_test(exchange="amq.direct", routing_key="key") - - def test_unbind_topic(self): - self.unbind_test(exchange="amq.topic", routing_key="key") - - def test_unbind_fanout(self): - self.unbind_test(exchange="amq.fanout") - - def test_unbind_headers(self): - self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"}) - - def unbind_test(self, exchange, routing_key="", args=None, headers={}): - #bind two queues and consume from them - channel = self.channel - - channel.queue_declare(queue="queue-1", exclusive="True") - channel.queue_declare(queue="queue-2", exclusive="True") - - channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True) - channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True) - - queue1 = self.client.queue("queue-1") - queue2 = self.client.queue("queue-2") - - channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) - - #send a message that will match both bindings - channel.basic_publish(exchange=exchange, routing_key=routing_key, - content=Content("one", properties={"headers": headers})) - - #unbind first queue - channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - - #send another message - channel.basic_publish(exchange=exchange, routing_key=routing_key, - content=Content("two", properties={"headers": headers})) - - #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).content.body) - try: - msg = queue1.get(timeout=1) - self.fail("Got extra message: %s" % msg.body) - except Empty: pass - - self.assertEquals("one", queue2.get(timeout=1).content.body) - self.assertEquals("two", queue2.get(timeout=1).content.body) - try: - msg = queue2.get(timeout=1) - self.fail("Got extra message: " + msg) - except Empty: pass - - def test_autodelete_shared(self): - """ - Test auto-deletion (of non-exclusive queues) - """ - channel = self.channel - other = self.connect() - channel2 = other.channel(1) - channel2.channel_open() - - channel.queue_declare(queue="auto-delete-me", auto_delete=True) - - #consume from both channels - reply = channel.basic_consume(queue="auto-delete-me", no_ack=True) - channel2.basic_consume(queue="auto-delete-me", no_ack=True) - - #implicit cancel - channel2.channel_close() - - #check it is still there - channel.queue_declare(queue="auto-delete-me", passive=True) - - #explicit cancel => queue is now unused again: - channel.basic_cancel(consumer_tag=reply.consumer_tag) - - #NOTE: this assumes there is no timeout in use - - #check that it has gone be declaring passively - try: - channel.queue_declare(queue="auto-delete-me", passive=True) - self.fail("Expected queue to have been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) |