summaryrefslogtreecommitdiff
path: root/qpid/tests
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-05 15:08:44 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-05 15:08:44 +0000
commit3df1424a43244ee1c347ff9163650753601c6893 (patch)
tree333c4228d660a6d3d175ddfe76403f1f37e75af1 /qpid/tests
parent5b4dc91393493685c6688d0b69334333413616fe (diff)
downloadqpid-python-3df1424a43244ee1c347ff9163650753601c6893.tar.gz
moved protocol tests from qpid/python to qpid/tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@906961 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tests')
-rw-r--r--qpid/tests/src/py/qpid_tests/__init__.py22
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py31
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py204
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/broker.py93
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py775
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/example.py95
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py461
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/management.py467
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/message.py918
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py68
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/query.py247
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/queue.py366
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/tx.py265
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py22
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_8/basic.py396
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_8/broker.py120
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_8/example.py94
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py327
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_8/queue.py255
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py66
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_8/tx.py209
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py22
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_9/query.py224
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_9/queue.py111
24 files changed, 5858 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/__init__.py b/qpid/tests/src/py/qpid_tests/__init__.py
new file mode 100644
index 0000000000..7b522f59af
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/__init__.py
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import broker_0_10, broker_0_9, broker_0_8
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
new file mode 100644
index 0000000000..f9315a6f90
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -0,0 +1,31 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from alternate_exchange import *
+from broker import *
+from dtx import *
+from example import *
+from exchange import *
+from management import *
+from message import *
+from query import *
+from queue import *
+from tx import *
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
new file mode 100644
index 0000000000..4d8617eb8e
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import traceback
+from qpid.queue import Empty
+from qpid.datatypes import Message
+from qpid.testlib import TestBase010
+from qpid.session import SessionException
+
+class AlternateExchangeTests(TestBase010):
+ """
+ Tests for the new mechanism for message returns introduced in 0-10
+ and available in 0-9 for preview
+ """
+
+ def test_unroutable(self):
+ """
+ Test that unroutable messages are delivered to the alternate-exchange if specified
+ """
+ session = self.session
+ #create an exchange with an alternate defined
+ session.exchange_declare(exchange="secondary", type="fanout")
+ session.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary")
+
+ #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
+ session.queue_declare(queue="returns", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="returns", exchange="secondary")
+ session.message_subscribe(destination="a", queue="returns")
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ returned = session.incoming("a")
+
+ #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
+ session.queue_declare(queue="processed", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="processed", exchange="primary", binding_key="my-key")
+ session.message_subscribe(destination="b", queue="processed")
+ session.message_flow(destination="b", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="b", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ processed = session.incoming("b")
+
+ #publish to the primary exchange
+ #...one message that makes it to the 'processed' queue:
+ dp=self.session.delivery_properties(routing_key="my-key")
+ session.message_transfer(destination="primary", message=Message(dp, "Good"))
+ #...and one that does not:
+ dp=self.session.delivery_properties(routing_key="unused-key")
+ session.message_transfer(destination="primary", message=Message(dp, "Bad"))
+
+ #delete the exchanges
+ session.exchange_delete(exchange="primary")
+ session.exchange_delete(exchange="secondary")
+
+ #verify behaviour
+ self.assertEqual("Good", processed.get(timeout=1).body)
+ self.assertEqual("Bad", returned.get(timeout=1).body)
+ self.assertEmpty(processed)
+ self.assertEmpty(returned)
+
+ def test_queue_delete(self):
+ """
+ Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
+ """
+ session = self.session
+ #set up a 'dead letter queue':
+ session.exchange_declare(exchange="dlq", type="fanout")
+ session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="dlq", queue="deleted")
+ session.message_subscribe(destination="dlq", queue="deleted")
+ session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ dlq = session.incoming("dlq")
+
+ #create a queue using the dlq as its alternate exchange:
+ session.queue_declare(queue="delete-me", alternate_exchange="dlq")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delete-me")
+ session.message_transfer(message=Message(dp, "One"))
+ session.message_transfer(message=Message(dp, "Two"))
+ session.message_transfer(message=Message(dp, "Three"))
+ #delete it:
+ session.queue_delete(queue="delete-me")
+ #delete the dlq exchange:
+ session.exchange_delete(exchange="dlq")
+
+ #check the messages were delivered to the dlq:
+ self.assertEqual("One", dlq.get(timeout=1).body)
+ self.assertEqual("Two", dlq.get(timeout=1).body)
+ self.assertEqual("Three", dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+
+ def test_delete_while_used_by_queue(self):
+ """
+ Ensure an exchange still in use as an alternate-exchange for a
+ queue can't be deleted
+ """
+ session = self.session
+ session.exchange_declare(exchange="alternate", type="fanout")
+
+ session2 = self.conn.session("alternate", 2)
+ session2.queue_declare(queue="q", alternate_exchange="alternate")
+ try:
+ session2.exchange_delete(exchange="alternate")
+ self.fail("Expected deletion of in-use alternate-exchange to fail")
+ except SessionException, e:
+ session = self.session
+ session.queue_delete(queue="q")
+ session.exchange_delete(exchange="alternate")
+ self.assertEquals(530, e.args[0].error_code)
+
+
+ def test_delete_while_used_by_exchange(self):
+ """
+ Ensure an exchange still in use as an alternate-exchange for
+ another exchange can't be deleted
+ """
+ session = self.session
+ session.exchange_declare(exchange="alternate", type="fanout")
+
+ session = self.conn.session("alternate", 2)
+ session.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate")
+ try:
+ session.exchange_delete(exchange="alternate")
+ self.fail("Expected deletion of in-use alternate-exchange to fail")
+ except SessionException, e:
+ session = self.session
+ session.exchange_delete(exchange="e")
+ session.exchange_delete(exchange="alternate")
+ self.assertEquals(530, e.args[0].error_code)
+
+
+ def test_modify_existing_exchange_alternate(self):
+ """
+ Ensure that attempting to modify an exhange to change
+ the alternate throws an exception
+ """
+ session = self.session
+ session.exchange_declare(exchange="alt1", type="direct")
+ session.exchange_declare(exchange="alt2", type="direct")
+ session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+ try:
+ # attempt to change the alternate on an already existing exchange
+ session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2")
+ self.fail("Expected changing an alternate on an existing exchange to fail")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+ session = self.conn.session("alternate", 2)
+ session.exchange_delete(exchange="onealternate")
+ session.exchange_delete(exchange="alt2")
+ session.exchange_delete(exchange="alt1")
+
+
+ def test_add_alternate_to_exchange(self):
+ """
+ Ensure that attempting to modify an exhange by adding
+ an alternate throws an exception
+ """
+ session = self.session
+ session.exchange_declare(exchange="alt1", type="direct")
+ session.exchange_declare(exchange="noalternate", type="fanout")
+ try:
+ # attempt to add an alternate on an already existing exchange
+ session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1")
+ self.fail("Expected adding an alternate on an existing exchange to fail")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+ session = self.conn.session("alternate", 2)
+ session.exchange_delete(exchange="noalternate")
+ session.exchange_delete(exchange="alt1")
+
+
+ def test_del_alternate_to_exchange(self):
+ """
+ Ensure that attempting to modify an exhange by declaring
+ it again without an alternate does nothing
+ """
+ session = self.session
+ session.exchange_declare(exchange="alt1", type="direct")
+ session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+ # attempt to re-declare without an alternate - silently ignore
+ session.exchange_declare(exchange="onealternate", type="fanout" )
+ session.exchange_delete(exchange="onealternate")
+ session.exchange_delete(exchange="alt1")
+
+
+ def assertEmpty(self, queue):
+ try:
+ msg = queue.get(timeout=1)
+ self.fail("Queue not empty: " + msg)
+ except Empty: None
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py b/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py
new file mode 100644
index 0000000000..81d723e322
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Closed
+from qpid.queue import Empty
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message, RangedSet
+
+class BrokerTests(TestBase010):
+ """Tests for basic Broker functionality"""
+
+ def test_ack_and_no_ack(self):
+ """
+ First, this test tries to receive a message with a no-ack
+ consumer. Second, this test tries to explicitly receive and
+ acknowledge a message with an acknowledging consumer.
+ """
+ session = self.session
+ session.queue_declare(queue = "myqueue", exclusive=True, auto_delete=True)
+
+ # No ack consumer
+ ctag = "tag1"
+ session.message_subscribe(queue = "myqueue", destination = ctag)
+ session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ body = "test no-ack"
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"), body))
+ msg = session.incoming(ctag).get(timeout = 5)
+ self.assert_(msg.body == body)
+
+ # Acknowledging consumer
+ session.queue_declare(queue = "otherqueue", exclusive=True, auto_delete=True)
+ ctag = "tag2"
+ session.message_subscribe(queue = "otherqueue", destination = ctag, accept_mode = 1)
+ session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ body = "test ack"
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"), body))
+ msg = session.incoming(ctag).get(timeout = 5)
+ session.message_accept(RangedSet(msg.id))
+ self.assert_(msg.body == body)
+
+ def test_simple_delivery_immediate(self):
+ """
+ Test simple message delivery where consume is issued before publish
+ """
+ session = self.session
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="test-queue", exchange="amq.fanout")
+ consumer_tag = "tag1"
+ session.message_subscribe(queue="test-queue", destination=consumer_tag)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ queue = session.incoming(consumer_tag)
+
+ body = "Immediate Delivery"
+ session.message_transfer("amq.fanout", None, None, Message(body))
+ msg = queue.get(timeout=5)
+ self.assert_(msg.body == body)
+
+ def test_simple_delivery_queued(self):
+ """
+ Test basic message delivery where publish is issued before consume
+ (i.e. requires queueing of the message)
+ """
+ session = self.session
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="test-queue", exchange="amq.fanout")
+ body = "Queued Delivery"
+ session.message_transfer("amq.fanout", None, None, Message(body))
+
+ consumer_tag = "tag1"
+ session.message_subscribe(queue="test-queue", destination=consumer_tag)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ queue = session.incoming(consumer_tag)
+ msg = queue.get(timeout=5)
+ self.assert_(msg.body == body)
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py
new file mode 100644
index 0000000000..2823385a3b
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py
@@ -0,0 +1,775 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.datatypes import Message, RangedSet
+from qpid.session import SessionException
+from qpid.testlib import TestBase010
+from qpid.compat import set
+from struct import pack, unpack
+from time import sleep
+
+class DtxTests(TestBase010):
+ """
+ Tests for the amqp dtx related classes.
+
+ Tests of the form test_simple_xxx test the basic transactional
+ behaviour. The approach here is to 'swap' a message from one queue
+ to another by consuming and re-publishing in the same
+ transaction. That transaction is then completed in different ways
+ and the appropriate result verified.
+
+ The other tests enforce more specific rules and behaviour on a
+ per-method or per-field basis.
+ """
+
+ XA_RBROLLBACK = 1
+ XA_RBTIMEOUT = 2
+ XA_OK = 0
+ tx_counter = 0
+
+ def reset_channel(self):
+ self.session.close()
+ self.session = self.conn.session("dtx-session", 1)
+
+ def test_simple_commit(self):
+ """
+ Test basic one-phase commit behaviour.
+ """
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
+ tx = self.xid("my-xid")
+ self.txswap(tx, "commit")
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #commit
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status)
+
+ #should close and reopen session to ensure no unacked messages are held
+ self.reset_channel()
+
+ #check result
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(1, "queue-b")
+ self.assertMessageId("commit", "queue-b")
+
+ def test_simple_prepare_commit(self):
+ """
+ Test basic two-phase commit behaviour.
+ """
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
+ tx = self.xid("my-xid")
+ self.txswap(tx, "prepare-commit")
+
+ #prepare
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #commit
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status)
+
+ self.reset_channel()
+
+ #check result
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(1, "queue-b")
+ self.assertMessageId("prepare-commit", "queue-b")
+
+
+ def test_simple_rollback(self):
+ """
+ Test basic rollback behaviour.
+ """
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
+ tx = self.xid("my-xid")
+ self.txswap(tx, "rollback")
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #rollback
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
+
+ self.reset_channel()
+
+ #check result
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("rollback", "queue-a")
+
+ def test_simple_prepare_rollback(self):
+ """
+ Test basic rollback behaviour after the transaction has been prepared.
+ """
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
+ tx = self.xid("my-xid")
+ self.txswap(tx, "prepare-rollback")
+
+ #prepare
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #rollback
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
+
+ self.reset_channel()
+
+ #check result
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("prepare-rollback", "queue-a")
+
+ def test_select_required(self):
+ """
+ check that an error is flagged if select is not issued before
+ start or end
+ """
+ session = self.session
+ tx = self.xid("dummy")
+ try:
+ session.dtx_start(xid=tx)
+
+ #if we get here we have failed, but need to do some cleanup:
+ session.dtx_end(xid=tx)
+ session.dtx_rollback(xid=tx)
+ self.fail("Session not selected for use with dtx, expected exception!")
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
+
+ def test_start_already_known(self):
+ """
+ Verify that an attempt to start an association with a
+ transaction that is already known is not allowed (unless the
+ join flag is set).
+ """
+ #create two sessions on different connection & select them for use with dtx:
+ session1 = self.session
+ session1.dtx_select()
+
+ other = self.connect()
+ session2 = other.session("other", 0)
+ session2.dtx_select()
+
+ #create a xid
+ tx = self.xid("dummy")
+ #start work on one session under that xid:
+ session1.dtx_start(xid=tx)
+ #then start on the other without the join set
+ failed = False
+ try:
+ session2.dtx_start(xid=tx)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ #cleanup:
+ if not failed:
+ session2.dtx_end(xid=tx)
+ other.close()
+ session1.dtx_end(xid=tx)
+ session1.dtx_rollback(xid=tx)
+
+ #verification:
+ if failed: self.assertEquals(530, error.args[0].error_code)
+ else: self.fail("Xid already known, expected exception!")
+
+ def test_forget_xid_on_completion(self):
+ """
+ Verify that a xid is 'forgotten' - and can therefore be used
+ again - once it is completed.
+ """
+ #do some transactional work & complete the transaction
+ self.test_simple_commit()
+ # session has been reset, so reselect for use with dtx
+ self.session.dtx_select()
+
+ #start association for the same xid as the previously completed txn
+ tx = self.xid("my-xid")
+ self.session.dtx_start(xid=tx)
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
+
+ def test_start_join_and_resume(self):
+ """
+ Ensure the correct error is signalled when both the join and
+ resume flags are set on starting an association between a
+ session and a transcation.
+ """
+ session = self.session
+ session.dtx_select()
+ tx = self.xid("dummy")
+ try:
+ session.dtx_start(xid=tx, join=True, resume=True)
+ #failed, but need some cleanup:
+ session.dtx_end(xid=tx)
+ session.dtx_rollback(xid=tx)
+ self.fail("Join and resume both set, expected exception!")
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
+
+ def test_start_join(self):
+ """
+ Verify 'join' behaviour, where a session is associated with a
+ transaction that is already associated with another session.
+ """
+ guard = self.keepQueuesAlive(["one", "two"])
+ #create two sessions & select them for use with dtx:
+ session1 = self.session
+ session1.dtx_select()
+
+ session2 = self.conn.session("second", 2)
+ session2.dtx_select()
+
+ #setup
+ session1.queue_declare(queue="one", auto_delete=True)
+ session1.queue_declare(queue="two", auto_delete=True)
+ session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage"))
+ session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage"))
+
+ #create a xid
+ tx = self.xid("dummy")
+ #start work on one session under that xid:
+ session1.dtx_start(xid=tx)
+ #then start on the other with the join flag set
+ session2.dtx_start(xid=tx, join=True)
+
+ #do work through each session
+ self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two'
+ self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one'
+
+ #mark end on both sessions
+ session1.dtx_end(xid=tx)
+ session2.dtx_end(xid=tx)
+
+ #commit and check
+ session1.dtx_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "one")
+ self.assertMessageCount(1, "two")
+ self.assertMessageId("a", "two")
+ self.assertMessageId("b", "one")
+
+
+ def test_suspend_resume(self):
+ """
+ Test suspension and resumption of an association
+ """
+ session = self.session
+ session.dtx_select()
+
+ #setup
+ session.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
+ session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
+
+ tx = self.xid("dummy")
+
+ session.dtx_start(xid=tx)
+ self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
+ session.dtx_end(xid=tx, suspend=True)
+
+ session.dtx_start(xid=tx, resume=True)
+ self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
+ session.dtx_end(xid=tx)
+
+ #commit and check
+ session.dtx_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "one")
+ self.assertMessageCount(1, "two")
+ self.assertMessageId("a", "two")
+ self.assertMessageId("b", "one")
+
+ def test_suspend_start_end_resume(self):
+ """
+ Test suspension and resumption of an association with work
+ done on another transaction when the first transaction is
+ suspended
+ """
+ session = self.session
+ session.dtx_select()
+
+ #setup
+ session.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
+ session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
+
+ tx = self.xid("dummy")
+
+ session.dtx_start(xid=tx)
+ self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
+ session.dtx_end(xid=tx, suspend=True)
+
+ session.dtx_start(xid=tx, resume=True)
+ self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
+ session.dtx_end(xid=tx)
+
+ #commit and check
+ session.dtx_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "one")
+ self.assertMessageCount(1, "two")
+ self.assertMessageId("a", "two")
+ self.assertMessageId("b", "one")
+
+ def test_end_suspend_and_fail(self):
+ """
+ Verify that the correct error is signalled if the suspend and
+ fail flag are both set when disassociating a transaction from
+ the session
+ """
+ session = self.session
+ session.dtx_select()
+ tx = self.xid("suspend_and_fail")
+ session.dtx_start(xid=tx)
+ try:
+ session.dtx_end(xid=tx, suspend=True, fail=True)
+ self.fail("Suspend and fail both set, expected exception!")
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
+
+ #cleanup
+ other = self.connect()
+ session = other.session("cleanup", 1)
+ session.dtx_rollback(xid=tx)
+ session.close()
+ other.close()
+
+
+ def test_end_unknown_xid(self):
+ """
+ Verifies that the correct exception is thrown when an attempt
+ is made to end the association for a xid not previously
+ associated with the session
+ """
+ session = self.session
+ session.dtx_select()
+ tx = self.xid("unknown-xid")
+ try:
+ session.dtx_end(xid=tx)
+ self.fail("Attempted to end association with unknown xid, expected exception!")
+ except SessionException, e:
+ self.assertEquals(409, e.args[0].error_code)
+
+ def test_end(self):
+ """
+ Verify that the association is terminated by end and subsequent
+ operations are non-transactional
+ """
+ guard = self.keepQueuesAlive(["tx-queue"])
+ session = self.conn.session("alternate", 1)
+ session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
+
+ #publish a message under a transaction
+ session.dtx_select()
+ tx = self.xid("dummy")
+ session.dtx_start(xid=tx)
+ session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage"))
+ session.dtx_end(xid=tx)
+
+ #now that association with txn is ended, publish another message
+ session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage"))
+
+ #check the second message is available, but not the first
+ self.assertMessageCount(1, "tx-queue")
+ self.subscribe(session, queue="tx-queue", destination="results")
+ msg = session.incoming("results").get(timeout=1)
+ self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id'))
+ session.message_cancel(destination="results")
+ #ack the message then close the session
+ session.message_accept(RangedSet(msg.id))
+ session.close()
+
+ session = self.session
+ #commit the transaction and check that the first message (and
+ #only the first message) is then delivered
+ session.dtx_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "tx-queue")
+ self.assertMessageId("one", "tx-queue")
+
+ def test_invalid_commit_one_phase_true(self):
+ """
+ Test that a commit with one_phase = True is rejected if the
+ transaction in question has already been prepared.
+ """
+ other = self.connect()
+ tester = other.session("tester", 1)
+ tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ tester.dtx_select()
+ tx = self.xid("dummy")
+ tester.dtx_start(xid=tx)
+ tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+ tester.dtx_end(xid=tx)
+ tester.dtx_prepare(xid=tx)
+ failed = False
+ try:
+ tester.dtx_commit(xid=tx, one_phase=True)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(409, error.args[0].error_code)
+ else:
+ tester.close()
+ other.close()
+ self.fail("Invalid use of one_phase=True, expected exception!")
+
+ def test_invalid_commit_one_phase_false(self):
+ """
+ Test that a commit with one_phase = False is rejected if the
+ transaction in question has not yet been prepared.
+ """
+ other = self.connect()
+ tester = other.session("tester", 1)
+ tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ tester.dtx_select()
+ tx = self.xid("dummy")
+ tester.dtx_start(xid=tx)
+ tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+ tester.dtx_end(xid=tx)
+ failed = False
+ try:
+ tester.dtx_commit(xid=tx, one_phase=False)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(409, error.args[0].error_code)
+ else:
+ tester.close()
+ other.close()
+ self.fail("Invalid use of one_phase=False, expected exception!")
+
+ def test_invalid_commit_not_ended(self):
+ """
+ Test that a commit fails if the xid is still associated with a session.
+ """
+ other = self.connect()
+ tester = other.session("tester", 1)
+ self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ self.session.dtx_select()
+ tx = self.xid("dummy")
+ self.session.dtx_start(xid=tx)
+ self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+ failed = False
+ try:
+ tester.dtx_commit(xid=tx, one_phase=False)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(409, error.args[0].error_code)
+ else:
+ tester.close()
+ other.close()
+ self.fail("Commit should fail as xid is still associated!")
+
+ def test_invalid_rollback_not_ended(self):
+ """
+ Test that a rollback fails if the xid is still associated with a session.
+ """
+ other = self.connect()
+ tester = other.session("tester", 1)
+ self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ self.session.dtx_select()
+ tx = self.xid("dummy")
+ self.session.dtx_start(xid=tx)
+ self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+ failed = False
+ try:
+ tester.dtx_rollback(xid=tx)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(409, error.args[0].error_code)
+ else:
+ tester.close()
+ other.close()
+ self.fail("Rollback should fail as xid is still associated!")
+
+
+ def test_invalid_prepare_not_ended(self):
+ """
+ Test that a prepare fails if the xid is still associated with a session.
+ """
+ other = self.connect()
+ tester = other.session("tester", 1)
+ self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ self.session.dtx_select()
+ tx = self.xid("dummy")
+ self.session.dtx_start(xid=tx)
+ self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+ failed = False
+ try:
+ tester.dtx_prepare(xid=tx)
+ except SessionException, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(409, error.args[0].error_code)
+ else:
+ tester.close()
+ other.close()
+ self.fail("Rollback should fail as xid is still associated!")
+
+ def test_implicit_end(self):
+ """
+ Test that an association is implicitly ended when the session
+ is closed (whether by exception or explicit client request)
+ and the transaction in question is marked as rollback only.
+ """
+ session1 = self.session
+ session2 = self.conn.session("other", 2)
+
+ #setup:
+ session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever"))
+ tx = self.xid("dummy")
+
+ session2.dtx_select()
+ session2.dtx_start(xid=tx)
+ session2.message_subscribe(queue="dummy", destination="dummy")
+ session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1)
+ session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session2.incoming("dummy").get(timeout=1)
+ session2.message_accept(RangedSet(msg.id))
+ session2.message_cancel(destination="dummy")
+ session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever"))
+ session2.close()
+
+ self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status)
+ session1.dtx_rollback(xid=tx)
+
+ def test_get_timeout(self):
+ """
+ Check that get-timeout returns the correct value, (and that a
+ transaction with a timeout can complete normally)
+ """
+ session = self.session
+ tx = self.xid("dummy")
+
+ session.dtx_select()
+ session.dtx_start(xid=tx)
+ self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout)
+ session.dtx_set_timeout(xid=tx, timeout=60)
+ self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
+ self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
+
+ def test_set_timeout(self):
+ """
+ Test the timeout of a transaction results in the expected
+ behaviour
+ """
+
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ #open new session to allow self.session to be used in checking the queue
+ session = self.conn.session("worker", 1)
+ #setup:
+ tx = self.xid("dummy")
+ session.queue_declare(queue="queue-a", auto_delete=True)
+ session.queue_declare(queue="queue-b", auto_delete=True)
+ session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage"))
+
+ session.dtx_select()
+ session.dtx_start(xid=tx)
+ self.swap(session, "queue-a", "queue-b")
+ session.dtx_set_timeout(xid=tx, timeout=2)
+ sleep(3)
+ #check that the work has been rolled back already
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("timeout", "queue-a")
+ #check the correct codes are returned when we try to complete the txn
+ self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
+ self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
+
+
+
+ def test_recover(self):
+ """
+ Test basic recover behaviour
+ """
+ session = self.session
+
+ session.dtx_select()
+ session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+
+ prepared = []
+ for i in range(1, 10):
+ tx = self.xid("tx%s" % (i))
+ session.dtx_start(xid=tx)
+ session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i)))
+ session.dtx_end(xid=tx)
+ if i in [2, 5, 6, 8]:
+ session.dtx_prepare(xid=tx)
+ prepared.append(tx)
+ else:
+ session.dtx_rollback(xid=tx)
+
+ xids = session.dtx_recover().in_doubt
+
+ #rollback the prepared transactions returned by recover
+ for x in xids:
+ session.dtx_rollback(xid=x)
+
+ #validate against the expected list of prepared transactions
+ actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these
+ expected = set([x.global_id for x in prepared])
+ intersection = actual.intersection(expected)
+
+ if intersection != expected:
+ missing = expected.difference(actual)
+ extra = actual.difference(expected)
+ self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
+
+ def test_bad_resume(self):
+ """
+ Test that a resume on a session not selected for use with dtx fails
+ """
+ session = self.session
+ try:
+ session.dtx_start(resume=True)
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
+
+ def test_prepare_unknown(self):
+ session = self.session
+ try:
+ session.dtx_prepare(xid=self.xid("unknown"))
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_commit_unknown(self):
+ session = self.session
+ try:
+ session.dtx_commit(xid=self.xid("unknown"))
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_rollback_unknown(self):
+ session = self.session
+ try:
+ session.dtx_rollback(xid=self.xid("unknown"))
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_get_timeout_unknown(self):
+ session = self.session
+ try:
+ session.dtx_get_timeout(xid=self.xid("unknown"))
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def xid(self, txid):
+ DtxTests.tx_counter += 1
+ branchqual = "v%s" % DtxTests.tx_counter
+ return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
+
+ def txswap(self, tx, id):
+ session = self.session
+ #declare two queues:
+ session.queue_declare(queue="queue-a", auto_delete=True)
+ session.queue_declare(queue="queue-b", auto_delete=True)
+
+ #put message with specified id on one queue:
+ dp=session.delivery_properties(routing_key="queue-a")
+ mp=session.message_properties(correlation_id=id)
+ session.message_transfer(message=Message(dp, mp, "DtxMessage"))
+
+ #start the transaction:
+ session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
+
+ #'swap' the message from one queue to the other, under that transaction:
+ self.swap(self.session, "queue-a", "queue-b")
+
+ #mark the end of the transactional work:
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
+
+ def swap(self, session, src, dest):
+ #consume from src:
+ session.message_subscribe(destination="temp-swap", queue=src)
+ session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1)
+ session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("temp-swap").get(timeout=1)
+ session.message_cancel(destination="temp-swap")
+ session.message_accept(RangedSet(msg.id))
+ #todo: also complete at this point?
+
+ #re-publish to dest:
+ dp=session.delivery_properties(routing_key=dest)
+ mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id'))
+ session.message_transfer(message=Message(dp, mp, msg.body))
+
+ def assertMessageCount(self, expected, queue):
+ self.assertEqual(expected, self.session.queue_query(queue=queue).message_count)
+
+ def assertMessageId(self, expected, queue):
+ self.session.message_subscribe(queue=queue, destination="results")
+ self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1)
+ self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
+ self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id'))
+ self.session.message_cancel(destination="results")
+
+ def getMessageProperty(self, msg, prop):
+ for h in msg.headers:
+ if hasattr(h, prop): return getattr(h, prop)
+ return None
+
+ def keepQueuesAlive(self, names):
+ session = self.conn.session("nasty", 99)
+ for n in names:
+ session.queue_declare(queue=n, auto_delete=True)
+ session.message_subscribe(destination=n, queue=n)
+ return session
+
+ def createMessage(self, session, key, id, body):
+ dp=session.delivery_properties(routing_key=key)
+ mp=session.message_properties(correlation_id=id)
+ session.message_transfer(message=Message(dp, mp, body))
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/example.py b/qpid/tests/src/py/qpid_tests/broker_0_10/example.py
new file mode 100644
index 0000000000..e36907d501
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/example.py
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+
+class ExampleTest (TestBase010):
+ """
+ An example Qpid test, illustrating the unittest framework and the
+ python Qpid client. The test class must inherit TestBase. The
+ test code uses the Qpid client to interact with a qpid broker and
+ verify it behaves as expected.
+ """
+
+ def test_example(self):
+ """
+ An example test. Note that test functions must start with 'test_'
+ to be recognized by the test framework.
+ """
+
+ # By inheriting TestBase, self.client is automatically connected
+ # and self.session is automatically opened as session(1)
+ # Other session methods mimic the protocol.
+ session = self.session
+
+ # Now we can send regular commands. If you want to see what the method
+ # arguments mean or what other commands are available, you can use the
+ # python builtin help() method. For example:
+ #help(chan)
+ #help(chan.exchange_declare)
+
+ # If you want browse the available protocol methods without being
+ # connected to a live server you can use the amqp-doc utility:
+ #
+ # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+ #
+ # Options:
+ # -e, --regexp use regex instead of glob when matching
+
+ # Now that we know what commands are available we can use them to
+ # interact with the server.
+
+ # Here we use ordinal arguments.
+ session.exchange_declare("test", "direct")
+
+ # Here we use keyword arguments.
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="test-queue", exchange="test", binding_key="key")
+
+ # Call Session.subscribe to register as a consumer.
+ # All the protocol methods return a message object. The message object
+ # has fields corresponding to the reply method fields, plus a content
+ # field that is filled if the reply includes content. In this case the
+ # interesting field is the consumer_tag.
+ session.message_subscribe(queue="test-queue", destination="consumer_tag")
+ session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ # We can use the session.incoming(...) method to access the messages
+ # delivered for our consumer_tag.
+ queue = session.incoming("consumer_tag")
+
+ # Now lets publish a message and see if our consumer gets it. To do
+ # this we need to import the Message class.
+ delivery_properties = session.delivery_properties(routing_key="key")
+ sent = Message(delivery_properties, "Hello World!")
+ session.message_transfer(destination="test", message=sent)
+
+ # Now we'll wait for the message to arrive. We can use the timeout
+ # argument in case the server hangs. By default queue.get() will wait
+ # until a message arrives or the connection to the server dies.
+ msg = queue.get(timeout=10)
+
+ # And check that we got the right response with assertEqual
+ self.assertEqual(sent.body, msg.body)
+
+ # Now acknowledge the message.
+ session.message_accept(RangedSet(msg.id))
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
new file mode 100644
index 0000000000..0ac78a4799
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
@@ -0,0 +1,461 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import Queue, logging, traceback
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
+from qpid.client import Closed
+from qpid.session import SessionException
+
+
+class TestHelper(TestBase010):
+ def setUp(self):
+ TestBase010.setUp(self)
+ self.queues = []
+ self.exchanges = []
+ self.subscriptions = []
+
+ def tearDown(self):
+ try:
+ for s in self.subscriptions:
+ self.session.message_cancel(destination=s)
+ for ssn, q in self.queues:
+ ssn.queue_delete(queue=q)
+ for ssn, ex in self.exchanges:
+ ssn.exchange_delete(exchange=ex)
+ except:
+ print "Error on tearDown:"
+ print traceback.print_exc()
+ TestBase010.tearDown(self)
+
+ def createMessage(self, key="", body=""):
+ return Message(self.session.delivery_properties(routing_key=key), body)
+
+ def getApplicationHeaders(self, msg):
+ for h in msg.headers:
+ if hasattr(h, 'application_headers'): return getattr(h, 'application_headers')
+ return None
+
+ def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
+ """
+ Publish to exchange and assert queue.get() returns the same message.
+ """
+ body = self.uniqueString()
+ dp=self.session.delivery_properties(routing_key=routing_key)
+ mp=self.session.message_properties(application_headers=properties)
+ self.session.message_transfer(destination=exchange, message=Message(dp, mp, body))
+ msg = queue.get(timeout=1)
+ self.assertEqual(body, msg.body)
+ if (properties):
+ self.assertEqual(properties, self.getApplicationHeaders(msg))
+
+ def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
+ """
+ Publish a message and consume it, assert it comes back intact.
+ Return the Queue object used to consume.
+ """
+ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
+
+ def assertEmpty(self, queue):
+ """Assert that the queue is empty"""
+ try:
+ queue.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Queue.Empty: None # Ignore
+
+ def queue_declare(self, session=None, *args, **keys):
+ session = session or self.session
+ reply = session.queue_declare(*args, **keys)
+ self.queues.append((session, keys["queue"]))
+ return reply
+
+ def exchange_declare(self, session=None, ticket=0, exchange='',
+ type='', passive=False, durable=False,
+ auto_delete=False,
+ arguments={}):
+ session = session or self.session
+ reply = session.exchange_declare(exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments)
+ self.exchanges.append((session,exchange))
+ return reply
+
+ def uniqueString(self):
+ """Generate a unique string, unique for this TestBase instance"""
+ if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+ return "Test Message " + str(self.uniqueCounter)
+
+ def consume(self, queueName):
+ """Consume from named queue returns the Queue object."""
+ if not "uniqueTag" in dir(self): self.uniqueTag = 1
+ else: self.uniqueTag += 1
+ consumer_tag = "tag" + str(self.uniqueTag)
+ self.session.message_subscribe(queue=queueName, destination=consumer_tag)
+ self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFFL)
+ self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
+ self.subscriptions.append(consumer_tag)
+ return self.session.incoming(consumer_tag)
+
+
+class StandardExchangeVerifier:
+ """Verifies standard exchange behavior.
+
+ Used as base class for classes that test standard exchanges."""
+
+ def verifyDirectExchange(self, ex):
+ """Verify that ex behaves like a direct exchange."""
+ self.queue_declare(queue="q")
+ self.session.exchange_bind(queue="q", exchange=ex, binding_key="k")
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+ try:
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+ self.fail("Expected Empty exception")
+ except Queue.Empty: None # Expected
+
+ def verifyFanOutExchange(self, ex):
+ """Verify that ex behaves like a fanout exchange."""
+ self.queue_declare(queue="q")
+ self.session.exchange_bind(queue="q", exchange=ex)
+ self.queue_declare(queue="p")
+ self.session.exchange_bind(queue="p", exchange=ex)
+ for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+ def verifyTopicExchange(self, ex):
+ """Verify that ex behaves like a topic exchange"""
+ self.queue_declare(queue="a")
+ self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*")
+ q = self.consume("a")
+ self.assertPublishGet(q, ex, "a.b.x")
+ self.assertPublishGet(q, ex, "a.x.b.x")
+ self.assertPublishGet(q, ex, "a.x.x.b.x")
+ # Shouldn't match
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
+ self.assert_(q.empty())
+
+ def verifyHeadersExchange(self, ex):
+ """Verify that ex is a headers exchange"""
+ self.queue_declare(queue="q")
+ self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+ q = self.consume("q")
+ headers = {"name":"fred", "age":3}
+ self.assertPublishGet(q, exchange=ex, properties=headers)
+ self.session.message_transfer(destination=ex) # No headers, won't deliver
+ self.assertEmpty(q);
+
+
+class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier):
+ """
+ The server SHOULD implement these standard exchange types: topic, headers.
+
+ Client attempts to declare an exchange with each of these standard types.
+ """
+
+ def testDirect(self):
+ """Declare and test a direct exchange"""
+ self.exchange_declare(0, exchange="d", type="direct")
+ self.verifyDirectExchange("d")
+
+ def testFanout(self):
+ """Declare and test a fanout exchange"""
+ self.exchange_declare(0, exchange="f", type="fanout")
+ self.verifyFanOutExchange("f")
+
+ def testTopic(self):
+ """Declare and test a topic exchange"""
+ self.exchange_declare(0, exchange="t", type="topic")
+ self.verifyTopicExchange("t")
+
+ def testHeaders(self):
+ """Declare and test a headers exchange"""
+ self.exchange_declare(0, exchange="h", type="headers")
+ self.verifyHeadersExchange("h")
+
+
+class RequiredInstancesRuleTests(TestHelper, StandardExchangeVerifier):
+ """
+ The server MUST, in each virtual host, pre-declare an exchange instance
+ for each standard exchange type that it implements, where the name of the
+ exchange instance is amq. followed by the exchange type name.
+
+ Client creates a temporary queue and attempts to bind to each required
+ exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
+ those types are defined).
+ """
+ def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+ def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+ def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
+
+ def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
+
+class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier):
+ """
+ The server MUST predeclare a direct exchange to act as the default exchange
+ for content Publish methods and for default queue bindings.
+
+ Client checks that the default exchange is active by specifying a queue
+ binding with no exchange name, and publishing a message with a suitable
+ routing key but without specifying the exchange name, then ensuring that
+ the message arrives in the queue correctly.
+ """
+ def testDefaultExchange(self):
+ # Test automatic binding by queue name.
+ self.queue_declare(queue="d")
+ self.assertPublishConsume(queue="d", routing_key="d")
+ # Test explicit bind to default queue
+ self.verifyDirectExchange("")
+
+
+# TODO aconway 2006-09-27: Fill in empty tests:
+
+class DefaultAccessRuleTests(TestHelper):
+ """
+ The server MUST NOT allow clients to access the default exchange except
+ by specifying an empty exchange name in the Queue.Bind and content Publish
+ methods.
+ """
+
+class ExtensionsRuleTests(TestHelper):
+ """
+ The server MAY implement other exchange types as wanted.
+ """
+
+
+class DeclareMethodMinimumRuleTests(TestHelper):
+ """
+ The server SHOULD support a minimum of 16 exchanges per virtual host and
+ ideally, impose no limit except as defined by available resources.
+
+ The client creates as many exchanges as it can until the server reports
+ an error; the number of exchanges successfuly created must be at least
+ sixteen.
+ """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestHelper):
+ """
+ The client MUST provide a valid access ticket giving "active" access to
+ the realm in which the exchange exists or will be created, or "passive"
+ access if the if-exists flag is set.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestHelper):
+ """
+ Exchange names starting with "amq." are reserved for predeclared and
+ standardised exchanges. The client MUST NOT attempt to create an exchange
+ starting with "amq.".
+
+ Similarly, exchanges starting with "qpid." are reserved for Qpid
+ implementation-specific system exchanges (such as the management exchange).
+ The client must not attempt to create an exchange starting with the string
+ "qpid.".
+ """
+ def template(self, reservedString, exchangeType):
+ try:
+ self.session.exchange_declare(exchange=reservedString, type=exchangeType)
+ self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".")
+ except SessionException, e:
+ self.assertEquals(e.args[0].error_code, 530)
+ # connection closed, reopen it
+ self.tearDown()
+ self.setUp()
+ try:
+ self.session.exchange_declare(exchange=reservedString + "abc123", type=exchangeType)
+ self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".")
+ except SessionException, e:
+ self.assertEquals(e.args[0].error_code, 530)
+ # connection closed, reopen it
+ self.tearDown()
+ self.setUp()
+ # The following should be legal:
+ self.session.exchange_declare(exchange=reservedString[:-1], type=exchangeType)
+ self.session.exchange_delete(exchange=reservedString[:-1])
+ self.session.exchange_declare(exchange=reservedString[1:], type=exchangeType)
+ self.session.exchange_delete(exchange=reservedString[1:])
+ self.session.exchange_declare(exchange="." + reservedString, type=exchangeType)
+ self.session.exchange_delete(exchange="." + reservedString)
+ self.session.exchange_declare(exchange="abc." + reservedString, type=exchangeType)
+ self.session.exchange_delete(exchange="abc." + reservedString)
+ self.session.exchange_declare(exchange="abc." + reservedString + "def", type=exchangeType)
+ self.session.exchange_delete(exchange="abc." + reservedString + "def")
+
+ def test_amq(self):
+ self.template("amq.", "direct")
+ self.template("amq.", "topic")
+ self.template("amq.", "fanout")
+
+ def test_qpid(self):
+ self.template("qpid.", "direct")
+ self.template("qpid.", "topic")
+ self.template("qpid.", "fanout")
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestHelper):
+ """
+ Exchanges cannot be redeclared with different types. The client MUST not
+ attempt to redeclare an existing exchange with a different type than used
+ in the original Exchange.Declare method.
+
+
+ """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestHelper):
+ """
+ The client MUST NOT attempt to create an exchange with a type that the
+ server does not support.
+
+
+ """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper):
+ """
+ If set, and the exchange does not already exist, the server MUST raise a
+ channel exception with reply code 404 (not found).
+ """
+ def test(self):
+ try:
+ self.session.exchange_declare(exchange="humpty_dumpty", passive=True)
+ self.fail("Expected 404 for passive declaration of unknown exchange.")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestHelper):
+ """
+ The server MUST support both durable and transient exchanges.
+
+
+ """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestHelper):
+ """
+ The server MUST ignore the durable field if the exchange already exists.
+
+
+ """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestHelper):
+ """
+ The server MUST ignore the auto-delete field if the exchange already
+ exists.
+
+
+ """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestHelper):
+ """
+ The client MUST provide a valid access ticket giving "active" access
+ rights to the exchange's access realm.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestHelper):
+ """
+ The client MUST NOT attempt to delete an exchange that does not exist.
+ """
+
+
+class HeadersExchangeTests(TestHelper):
+ """
+ Tests for headers exchange functionality.
+ """
+ def setUp(self):
+ TestHelper.setUp(self)
+ self.queue_declare(queue="q")
+ self.q = self.consume("q")
+
+ def myAssertPublishGet(self, headers):
+ self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
+
+ def myBasicPublish(self, headers):
+ mp=self.session.message_properties(application_headers=headers)
+ self.session.message_transfer(destination="amq.match", message=Message(mp, "foobar"))
+
+ def testMatchAll(self):
+ self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+
+ # None of these should match
+ self.myBasicPublish({})
+ self.myBasicPublish({"name":"barney"})
+ self.myBasicPublish({"name":10})
+ self.myBasicPublish({"name":"fred", "age":2})
+ self.assertEmpty(self.q)
+
+ def testMatchAny(self):
+ self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred"})
+ self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+ self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+ # Wont match
+ self.myBasicPublish({})
+ self.myBasicPublish({"irrelevant":0})
+ self.assertEmpty(self.q)
+
+
+class MiscellaneousErrorsTests(TestHelper):
+ """
+ Test some miscellaneous error conditions
+ """
+ def testTypeNotKnown(self):
+ try:
+ self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+ self.fail("Expected 503 for declaration of unknown exchange type.")
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
+
+ def testDifferentDeclaredType(self):
+ self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+ try:
+ session = self.conn.session("alternate", 2)
+ session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+ self.fail("Expected 530 for redeclaration of exchange with different type.")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+
+class ExchangeTests(TestHelper):
+ def testHeadersBindNoMatchArg(self):
+ self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)
+ try:
+ self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
+ self.fail("Expected failure for missing x-match arg.")
+ except SessionException, e:
+ self.assertEquals(541, e.args[0].error_code)
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
new file mode 100644
index 0000000000..677645fa2c
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
@@ -0,0 +1,467 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+from qpid.management import managementChannel, managementClient
+from threading import Condition
+from time import sleep
+import qmf.console
+
+class ManagementTest (TestBase010):
+ """
+ Tests for the management hooks
+ """
+
+ def test_broker_connectivity_oldAPI (self):
+ """
+ Call the "echo" method on the broker to verify it is alive and talking.
+ """
+ session = self.session
+
+ mc = managementClient ()
+ mch = mc.addChannel (session)
+
+ mc.syncWaitForStable (mch)
+ brokers = mc.syncGetObjects (mch, "broker")
+ self.assertEqual (len (brokers), 1)
+ broker = brokers[0]
+ args = {}
+ body = "Echo Message Body"
+ args["body"] = body
+
+ for seq in range (1, 5):
+ args["sequence"] = seq
+ res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args)
+ self.assertEqual (res.status, 0)
+ self.assertEqual (res.statusText, "OK")
+ self.assertEqual (res.sequence, seq)
+ self.assertEqual (res.body, body)
+ mc.removeChannel (mch)
+
+ def test_methods_sync (self):
+ """
+ Call the "echo" method on the broker to verify it is alive and talking.
+ """
+ session = self.session
+ self.startQmf()
+
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+
+ body = "Echo Message Body"
+ for seq in range(1, 20):
+ res = broker.echo(seq, body)
+ self.assertEqual(res.status, 0)
+ self.assertEqual(res.text, "OK")
+ self.assertEqual(res.sequence, seq)
+ self.assertEqual(res.body, body)
+
+ def test_get_objects(self):
+ self.startQmf()
+
+ # get the package list, verify that the qpid broker package is there
+ packages = self.qmf.getPackages()
+ assert 'org.apache.qpid.broker' in packages
+
+ # get the schema class keys for the broker, verify the broker table and link-down event
+ keys = self.qmf.getClasses('org.apache.qpid.broker')
+ broker = None
+ linkDown = None
+ for key in keys:
+ if key.getClassName() == "broker": broker = key
+ if key.getClassName() == "brokerLinkDown" : linkDown = key
+ assert broker
+ assert linkDown
+
+ brokerObjs = self.qmf.getObjects(_class="broker")
+ assert len(brokerObjs) == 1
+ brokerObjs = self.qmf.getObjects(_key=broker)
+ assert len(brokerObjs) == 1
+
+ def test_self_session_id (self):
+ self.startQmf()
+ sessionId = self.qmf_broker.getSessionId()
+ brokerSessions = self.qmf.getObjects(_class="session")
+
+ found = False
+ for bs in brokerSessions:
+ if bs.name == sessionId:
+ found = True
+ self.assertEqual (found, True)
+
+ def test_standard_exchanges (self):
+ self.startQmf()
+
+ exchanges = self.qmf.getObjects(_class="exchange")
+ exchange = self.findExchange (exchanges, "")
+ self.assertEqual (exchange.type, "direct")
+ exchange = self.findExchange (exchanges, "amq.direct")
+ self.assertEqual (exchange.type, "direct")
+ exchange = self.findExchange (exchanges, "amq.topic")
+ self.assertEqual (exchange.type, "topic")
+ exchange = self.findExchange (exchanges, "amq.fanout")
+ self.assertEqual (exchange.type, "fanout")
+ exchange = self.findExchange (exchanges, "amq.match")
+ self.assertEqual (exchange.type, "headers")
+ exchange = self.findExchange (exchanges, "qpid.management")
+ self.assertEqual (exchange.type, "topic")
+
+ def findExchange (self, exchanges, name):
+ for exchange in exchanges:
+ if exchange.name == name:
+ return exchange
+ return None
+
+ def test_move_queued_messages(self):
+ """
+ Test ability to move messages from the head of one queue to another.
+ Need to test moveing all and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up source queue"
+ session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Move Message %d" % count
+ src_msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=src_msg)
+
+ "Set up destination queue"
+ session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="dest-queue", exchange="amq.direct")
+
+ queues = self.qmf.getObjects(_class="queue")
+
+ "Move 10 messages from src-queue to dest-queue"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ self.assertEqual (result.status, 0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,10)
+ self.assertEqual (dq.msgDepth,10)
+
+ "Move all remaining messages to destination"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ self.assertEqual (result.status,0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,0)
+ self.assertEqual (dq.msgDepth,20)
+
+ "Use a bad source queue name"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ self.assertEqual (result.status,4)
+
+ "Use a bad destination queue name"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ self.assertEqual (result.status,4)
+
+ " Use a large qty (40) to move from dest-queue back to "
+ " src-queue- should move all "
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ self.assertEqual (result.status,0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,20)
+ self.assertEqual (dq.msgDepth,0)
+
+ "Consume the messages of the queue and check they are all there in order"
+ session.message_subscribe(queue="src-queue", destination="tag")
+ session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("tag")
+ for count in twenty:
+ consumed_msg = queue.get(timeout=1)
+ body = "Move Message %d" % count
+ self.assertEqual(body, consumed_msg.body)
+
+ def test_purge_queue(self):
+ """
+ Test ability to purge messages from the head of a queue.
+ Need to test moveing all, 1 (top message) and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up purge queue"
+ session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Purge Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+
+ "Purge top message from purge-queue"
+ result = pq.purge(1)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,19)
+
+ "Purge top 9 messages from purge-queue"
+ result = pq.purge(9)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,10)
+
+ "Purge all messages from purge-queue"
+ result = pq.purge(0)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,0)
+
+ def test_reroute_queue(self):
+ """
+ Test ability to reroute messages from the head of a queue.
+ Need to test moving all, 1 (top message) and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up test queue"
+ session.exchange_declare(exchange="alt.direct1", type="direct")
+ session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
+ session.exchange_declare(exchange="alt.direct2", type="direct")
+ session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
+ session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
+ session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Reroute Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
+
+ "Reroute top message from reroute-queue to alternate exchange"
+ result = pq.reroute(1, True, "")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
+ self.assertEqual(pq.msgDepth,19)
+ self.assertEqual(aq.msgDepth,1)
+
+ "Reroute top 9 messages from reroute-queue to alt.direct2"
+ result = pq.reroute(9, False, "alt.direct2")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+ self.assertEqual(pq.msgDepth,10)
+ self.assertEqual(aq.msgDepth,9)
+
+ "Reroute using a non-existent exchange"
+ result = pq.reroute(0, False, "amq.nosuchexchange")
+ self.assertEqual(result.status, 4)
+
+ "Reroute all messages from reroute-queue"
+ result = pq.reroute(0, False, "alt.direct2")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+ self.assertEqual(pq.msgDepth,0)
+ self.assertEqual(aq.msgDepth,19)
+
+ "Make more messages"
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Reroute Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ "Reroute onto the same queue"
+ result = pq.reroute(0, False, "amq.direct")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ self.assertEqual(pq.msgDepth,20)
+
+
+ def test_methods_async (self):
+ """
+ """
+ class Handler (qmf.console.Console):
+ def __init__(self):
+ self.cv = Condition()
+ self.xmtList = {}
+ self.rcvList = {}
+
+ def methodResponse(self, broker, seq, response):
+ self.cv.acquire()
+ try:
+ self.rcvList[seq] = response
+ finally:
+ self.cv.release()
+
+ def request(self, broker, count):
+ self.count = count
+ for idx in range(count):
+ self.cv.acquire()
+ try:
+ seq = broker.echo(idx, "Echo Message", _async = True)
+ self.xmtList[seq] = idx
+ finally:
+ self.cv.release()
+
+ def check(self):
+ if self.count != len(self.xmtList):
+ return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
+ lost = 0
+ mismatched = 0
+ for seq in self.xmtList:
+ value = self.xmtList[seq]
+ if seq in self.rcvList:
+ result = self.rcvList.pop(seq)
+ if result.sequence != value:
+ mismatched += 1
+ else:
+ lost += 1
+ spurious = len(self.rcvList)
+ if lost == 0 and mismatched == 0 and spurious == 0:
+ return "pass"
+ else:
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
+
+ handler = Handler()
+ self.startQmf(handler)
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+ handler.request(broker, 20)
+ sleep(1)
+ self.assertEqual(handler.check(), "pass")
+
+ def test_connection_close(self):
+ """
+ Test management method for closing connection
+ """
+ self.startQmf()
+ conn = self.connect()
+ session = conn.session("my-named-session")
+
+ #using qmf find named session and close the corresponding connection:
+ qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0]
+ qmf_ssn_object._connectionRef_.close()
+
+ #check that connection is closed
+ try:
+ conn.session("another-session")
+ self.fail("Expected failure from closed connection")
+ except: None
+
+ #make sure that the named session has been closed and the name can be re-used
+ conn = self.connect()
+ session = conn.session("my-named-session")
+ session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
+
+ def test_binding_count_on_queue(self):
+ self.startQmf()
+ conn = self.connect()
+ session = self.session
+
+ QUEUE = "binding_test_queue"
+ EX_DIR = "binding_test_exchange_direct"
+ EX_FAN = "binding_test_exchange_fanout"
+ EX_TOPIC = "binding_test_exchange_topic"
+ EX_HDR = "binding_test_exchange_headers"
+
+ #
+ # Create a test queue
+ #
+ session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True)
+ queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0]
+ if not queue:
+ self.fail("Queue not found")
+ self.assertEqual(queue.bindingCount, 1, "wrong initial binding count")
+
+ #
+ # Create an exchange of each supported type
+ #
+ session.exchange_declare(exchange=EX_DIR, type="direct")
+ session.exchange_declare(exchange=EX_FAN, type="fanout")
+ session.exchange_declare(exchange=EX_TOPIC, type="topic")
+ session.exchange_declare(exchange=EX_HDR, type="headers")
+
+ #
+ # Bind each exchange to the test queue
+ #
+ match = {}
+ match['x-match'] = "all"
+ match['key'] = "value"
+ session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1")
+ session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+ session.exchange_bind(exchange=EX_FAN, queue=QUEUE)
+ session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#")
+ session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+ session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match)
+ match['key2'] = "value2"
+ session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match)
+
+ #
+ # Verify that the queue's binding count accounts for the new bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 8,
+ "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount)
+
+ #
+ # Remove some of the bindings
+ #
+ session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+ session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+ session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2")
+
+ #
+ # Verify that the queue's binding count accounts for the deleted bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 5,
+ "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount)
+ #
+ # Delete the exchanges
+ #
+ session.exchange_delete(exchange=EX_DIR)
+ session.exchange_delete(exchange=EX_FAN)
+ session.exchange_delete(exchange=EX_TOPIC)
+ session.exchange_delete(exchange=EX_HDR)
+
+ #
+ # Verify that the queue's binding count accounts for the lost bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 1,
+ "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/message.py b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
new file mode 100644
index 0000000000..e80333a1e6
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -0,0 +1,918 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message, RangedSet
+from qpid.session import SessionException
+
+from qpid.content import Content
+from time import sleep
+
+class MessageTests(TestBase010):
+ """Tests for 'methods' on the amqp message 'class'"""
+
+ def test_no_local(self):
+ """
+ NOTE: this is a test of a QPID specific feature
+
+ Test that the qpid specific no_local arg is honoured.
+ """
+ session = self.session
+ #setup, declare two queues one of which excludes delivery of locally sent messages
+ session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
+ #establish two consumers
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b")
+
+ #send a message
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))
+
+ #send a message from another session on the same connection to each queue
+ session2 = self.conn.session("my-local-session")
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well"))
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either"))
+
+ #send a message from a session on another connection to each queue
+ for q in ["test-queue-1a", "test-queue-1b"]:
+ session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key")
+ other = self.connect()
+ session3 = other.session("my-other-session")
+ session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local"))
+ other.close()
+
+ #check the queues of the two consumers
+ excluded = session.incoming("local_excluded")
+ included = session.incoming("local_included")
+ for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]:
+ msg = included.get(timeout=1)
+ self.assertEqual(b, msg.body)
+ msg = excluded.get(timeout=1)
+ self.assertEqual("i-am-not-local", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+ def test_no_local_awkward(self):
+
+ """
+ NOTE: this is a test of a QPID specific feature
+
+ Check that messages which will be excluded through no-local
+ processing will not block subsequent deliveries
+ """
+
+ session = self.session
+ #setup:
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
+ #establish consumer which excludes delivery of locally sent messages
+ self.subscribe(destination="local_excluded", queue="test-queue")
+
+ #send a 'local' message
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local"))
+
+ #send a non local message
+ other = self.connect()
+ session2 = other.session("my-session", 1)
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign"))
+ session2.close()
+ other.close()
+
+ #check that the second message only is delivered
+ excluded = session.incoming("local_excluded")
+ msg = excluded.get(timeout=1)
+ self.assertEqual("foreign", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received extra message")
+ except Empty: None
+ #check queue is empty
+ self.assertEqual(0, session.queue_query(queue="test-queue").message_count)
+
+ def test_no_local_exclusive_subscribe(self):
+ """
+ NOTE: this is a test of a QPID specific feature
+
+ Test that the no_local processing works on queues not declared
+ as exclusive, but with an exclusive subscription
+ """
+ session = self.session
+
+ #setup, declare two queues one of which excludes delivery of
+ #locally sent messages but is not declared as exclusive
+ session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'})
+ #establish two consumers
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True)
+
+ #send a message from the same session to each queue
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))
+
+ #send a message from another session on the same connection to each queue
+ session2 = self.conn.session("my-session")
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well"))
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either"))
+
+ #send a message from a session on another connection to each queue
+ for q in ["test-queue-1a", "test-queue-1b"]:
+ session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key")
+ other = self.connect()
+ session3 = other.session("my-other-session")
+ session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local"))
+ other.close()
+
+ #check the queues of the two consumers
+ excluded = session.incoming("local_excluded")
+ included = session.incoming("local_included")
+ for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]:
+ msg = included.get(timeout=1)
+ self.assertEqual(b, msg.body)
+ msg = excluded.get(timeout=1)
+ self.assertEqual("i-am-not-local", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+
+ def test_consume_exclusive(self):
+ """
+ Test an exclusive consumer prevents other consumer being created
+ """
+ session = self.session
+ session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+ session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
+ try:
+ session.message_subscribe(destination="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ def test_consume_exclusive2(self):
+ """
+ Check that an exclusive consumer cannot be created if a consumer already exists:
+ """
+ session = self.session
+ session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+ session.message_subscribe(destination="first", queue="test-queue-2")
+ try:
+ session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ def test_consume_queue_not_found(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ session = self.session
+ try:
+ #queue specified but doesn't exist:
+ session.message_subscribe(queue="invalid-queue", destination="a")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_consume_queue_not_specified(self):
+ session = self.session
+ try:
+ #queue not specified and none previously declared for channel:
+ session.message_subscribe(destination="a")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except SessionException, e:
+ self.assertEquals(531, e.args[0].error_code)
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ session = self.session
+ #setup, declare a queue:
+ session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ session.message_subscribe(destination="first", queue="test-queue-3")
+ try:
+ session.message_subscribe(destination="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+
+ def test_cancel(self):
+ """
+ Test compliance of the basic.cancel method
+ """
+ session = self.session
+ #setup, declare a queue:
+ session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One"))
+
+ session.message_subscribe(destination="my-consumer", queue="test-queue-4")
+ myqueue = session.incoming("my-consumer")
+ session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ #should flush here
+
+ #cancel should stop messages being delivered
+ session.message_cancel(destination="my-consumer")
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two"))
+ msg = myqueue.get(timeout=1)
+ self.assertEqual("One", msg.body)
+ try:
+ msg = myqueue.get(timeout=1)
+ self.fail("Got message after cancellation: " + msg)
+ except Empty: None
+
+ #cancellation of non-existant consumers should be handled without error
+ session.message_cancel(destination="my-consumer")
+ session.message_cancel(destination="this-never-existed")
+
+
+ def test_ack(self):
+ """
+ Test basic ack/recover behaviour
+ """
+ session = self.conn.session("alternate-session", timeout=10)
+ session.queue_declare(queue="test-ack-queue", auto_delete=True)
+
+ session.message_subscribe(queue = "test-ack-queue", destination = "consumer")
+ session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("consumer")
+
+ delivery_properties = session.delivery_properties(routing_key="test-ack-queue")
+ for i in ["One", "Two", "Three", "Four", "Five"]:
+ session.message_transfer(message=Message(delivery_properties, i))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.body)
+ self.assertEqual("Two", msg2.body)
+ self.assertEqual("Three", msg3.body)
+ self.assertEqual("Four", msg4.body)
+ self.assertEqual("Five", msg5.body)
+
+ session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
+
+ #subscribe from second session here to ensure queue is not
+ #auto-deleted when alternate session closes (no need to ack on these):
+ self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
+
+ #now close the session, and see that the unacked messages are
+ #then redelivered to another subscriber:
+ session.close(timeout=10)
+
+ session = self.session
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("checker")
+
+ msg3b = queue.get(timeout=1)
+ msg5b = queue.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.body)
+ self.assertEqual("Five", msg5b.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ def test_reject(self):
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
+ session.queue_declare(queue = "r", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue = "r", exchange = "amq.fanout")
+
+ session.message_subscribe(queue = "q", destination = "consumer")
+ session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah"))
+ msg = session.incoming("consumer").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+ session.message_reject(RangedSet(msg.id))
+
+ session.message_subscribe(queue = "r", destination = "checker")
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("checker").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+
+ def test_credit_flow_messages(self):
+ """
+ Test basic credit based flow control with unit = message
+ """
+ #declare an exclusive queue
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
+
+ #set message credit to finite amount (less than enough for all messages)
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+ #set infinite byte credit
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+ #check that expected number were received
+ q = session.incoming("c")
+ for i in range(1, 6):
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(6, 11):
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c")
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ def test_credit_flow_bytes(self):
+ """
+ Test basic credit based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
+ #send batch of messages to queue
+ for i in range(10):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
+
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 19
+
+ #set byte credit to finite amount (less than enough for all messages)
+ session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
+ #set infinite message credit
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c")
+ #check that expected number were received
+ q = session.incoming("c")
+ for i in range(5):
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(5):
+ session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c")
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+
+ def test_window_flow_messages(self):
+ """
+ Test basic window based flow control with unit = message
+ """
+ #declare an exclusive queue
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
+
+ #set message credit to finite amount (less than enough for all messages)
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+ #set infinite byte credit
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+ #check that expected number were received
+ q = session.incoming("c")
+ for i in range(1, 6):
+ msg = q.get(timeout = 1)
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+ self.assertDataEquals(session, msg, "Message %d" % i)
+ self.assertEmpty(q)
+
+ #acknowledge messages and check more are received
+ #TODO: there may be a nicer way of doing this
+ session.channel.session_completed(session.receiver._completed)
+
+ for i in range(6, 11):
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+
+ def test_window_flow_bytes(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
+ #send batch of messages to queue
+ for i in range(10):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
+
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 19
+
+ #set byte credit to finite amount (less than enough for all messages)
+ session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
+ #set infinite message credit
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c")
+ #check that expected number were received
+ q = session.incoming("c")
+ msgs = []
+ for i in range(5):
+ msg = q.get(timeout = 1)
+ msgs.append(msg)
+ self.assertDataEquals(session, msg, "abcdefgh")
+ self.assertEmpty(q)
+
+ #ack each message individually and check more are received
+ for i in range(5):
+ msg = msgs.pop()
+ #TODO: there may be a nicer way of doing this
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ def test_window_flush_ack_flow(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ ssn = self.session
+ ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer
+ ssn.message_subscribe(queue = "q", destination = "c",
+ accept_mode=ssn.accept_mode.explicit)
+ ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c")
+
+ #send message A
+ ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A"))
+
+ for unit in ssn.credit_unit.VALUES:
+ ssn.message_flow("c", unit, 0xFFFFFFFFL)
+
+ q = ssn.incoming("c")
+ msgA = q.get(timeout=10)
+
+ ssn.message_flush(destination="c")
+
+ # XXX
+ ssn.receiver._completed.add(msgA.id)
+ ssn.channel.session_completed(ssn.receiver._completed)
+ ssn.message_accept(RangedSet(msgA.id))
+
+ for unit in ssn.credit_unit.VALUES:
+ ssn.message_flow("c", unit, 0xFFFFFFFFL)
+
+ #send message B
+ ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B"))
+
+ msgB = q.get(timeout=10)
+
+ def test_subscribe_not_acquired(self):
+ """
+ Test the not-acquired modes works as expected for a simple case
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 6):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
+
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+
+ for i in range(6, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
+
+ #both subscribers should see all messages
+ qA = session.incoming("a")
+ qB = session.incoming("b")
+ for i in range(1, 11):
+ for q in [qA, qB]:
+ msg = q.get(timeout = 1)
+ self.assertEquals("Message %s" % i, msg.body)
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+
+ #TODO: tidy up completion
+ session.channel.session_completed(session.receiver._completed)
+ #messages should still be on the queue:
+ self.assertEquals(10, session.queue_query(queue = "q").message_count)
+
+ def test_acquire_with_no_accept_and_credit_flow(self):
+ """
+ Test that messages recieved unacquired, with accept not
+ required in windowing mode can be acquired.
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1)
+ session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("acquire me", msg.body)
+ #message should still be on the queue:
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+ transfers = RangedSet(msg.id)
+ response = session.message_acquire(transfers)
+ #check that we get notification (i.e. message_acquired)
+ self.assert_(msg.id in response.transfers)
+ #message should have been removed from the queue:
+ self.assertEquals(0, session.queue_query(queue = "q").message_count)
+
+ def test_acquire(self):
+ """
+ Test explicit acquire function
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("acquire me", msg.body)
+ #message should still be on the queue:
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+ transfers = RangedSet(msg.id)
+ response = session.message_acquire(transfers)
+ #check that we get notification (i.e. message_acquired)
+ self.assert_(msg.id in response.transfers)
+ #message should have been removed from the queue:
+ self.assertEquals(0, session.queue_query(queue = "q").message_count)
+ session.message_accept(transfers)
+
+
+ def test_release(self):
+ """
+ Test explicit release function
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me"))
+
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("release me", msg.body)
+ session.message_cancel(destination = "a")
+ session.message_release(RangedSet(msg.id))
+
+ #message should not have been removed from the queue:
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+ def test_release_ordering(self):
+ """
+ Test order of released messages is as expected
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range (1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i)))
+
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ queue = session.incoming("a")
+ first = queue.get(timeout = 1)
+ for i in range(2, 10):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("released message %s" % (i), msg.body)
+
+ last = queue.get(timeout = 1)
+ self.assertEmpty(queue)
+ released = RangedSet()
+ released.add(first.id, last.id)
+ session.message_release(released)
+
+ #TODO: may want to clean this up...
+ session.receiver._completed.add(first.id, last.id)
+ session.channel.session_completed(session.receiver._completed)
+
+ for i in range(1, 11):
+ self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body)
+
+ def test_ranged_ack(self):
+ """
+ Test acking of messages ranges
+ """
+ session = self.conn.session("alternate-session", timeout=10)
+
+ session.queue_declare(queue = "q", auto_delete=True)
+ delivery_properties = session.delivery_properties(routing_key="q")
+ for i in range (1, 11):
+ session.message_transfer(message=Message(delivery_properties, "message %s" % (i)))
+
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ queue = session.incoming("a")
+ ids = []
+ for i in range (1, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message %s" % (i), msg.body)
+ ids.append(msg.id)
+
+ self.assertEmpty(queue)
+
+ #ack all but the fourth message (command id 2)
+ accepted = RangedSet()
+ accepted.add(ids[0], ids[2])
+ accepted.add(ids[4], ids[9])
+ session.message_accept(accepted)
+
+ #subscribe from second session here to ensure queue is not
+ #auto-deleted when alternate session closes (no need to ack on these):
+ self.session.message_subscribe(queue = "q", destination = "checker")
+
+ #now close the session, and see that the unacked messages are
+ #then redelivered to another subscriber:
+ session.close(timeout=10)
+
+ session = self.session
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("checker")
+
+ self.assertEquals("message 4", queue.get(timeout = 1).body)
+ self.assertEmpty(queue)
+
+ def test_subscribe_not_acquired_2(self):
+ session = self.session
+
+ #publish some messages
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+ #consume some of them
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_set_flow_mode(flow_mode = 0, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+
+ queue = session.incoming("a")
+ for i in range(1, 6):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ #complete and accept
+ session.message_accept(RangedSet(msg.id))
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ self.assertEmpty(queue)
+
+ #now create a not-acquired subscriber
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+
+ #check it gets those not consumed
+ queue = session.incoming("b")
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+ for i in range(6, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ session.message_release(RangedSet(msg.id))
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, session.queue_query(queue="q").message_count)
+
+ def test_subscribe_not_acquired_3(self):
+ session = self.session
+
+ #publish some messages
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+ #create a not-acquired subscriber
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+
+ #browse through messages
+ queue = session.incoming("a")
+ for i in range(1, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ if (i % 2):
+ #try to acquire every second message
+ response = session.message_acquire(RangedSet(msg.id))
+ #check that acquire succeeds
+ self.assert_(msg.id in response.transfers)
+ session.message_accept(RangedSet(msg.id))
+ else:
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ self.assertEmpty(queue)
+
+ #create a second not-acquired subscriber
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+ #check it gets those not consumed
+ queue = session.incoming("b")
+ for i in [2,4,6,8,10]:
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, session.queue_query(queue="q").message_count)
+
+ def test_release_unacquired(self):
+ session = self.session
+
+ #create queue
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ #send message
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message"))
+
+ #create two 'browsers'
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ queueA = session.incoming("a")
+
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b")
+ queueB = session.incoming("b")
+
+ #have each browser release the message
+ msgA = queueA.get(timeout = 1)
+ session.message_release(RangedSet(msgA.id))
+
+ msgB = queueB.get(timeout = 1)
+ session.message_release(RangedSet(msgB.id))
+
+ #cancel browsers
+ session.message_cancel(destination = "a")
+ session.message_cancel(destination = "b")
+
+ #create consumer
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c")
+ queueC = session.incoming("c")
+ #consume the message then ack it
+ msgC = queueC.get(timeout = 1)
+ session.message_accept(RangedSet(msgC.id))
+ #ensure there are no other messages
+ self.assertEmpty(queueC)
+
+ def test_release_order(self):
+ session = self.session
+
+ #create queue
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ #send messages
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+ #subscribe:
+ session.message_subscribe(queue="q", destination="a")
+ a = session.incoming("a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+
+ for i in range(1, 11):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ if (i % 2):
+ #accept all odd messages
+ session.message_accept(RangedSet(msg.id))
+ else:
+ #release all even messages
+ session.message_release(RangedSet(msg.id))
+
+ #browse:
+ session.message_subscribe(queue="q", destination="b", acquire_mode=1)
+ b = session.incoming("b")
+ b.start()
+ for i in [2, 4, 6, 8, 10]:
+ msg = b.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+
+
+ def test_empty_body(self):
+ session = self.session
+ session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
+ props = session.delivery_properties(routing_key="xyz")
+ session.message_transfer(message=Message(props, ""))
+
+ consumer_tag = "tag1"
+ session.message_subscribe(queue="xyz", destination=consumer_tag)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ queue = session.incoming(consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEquals("", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ def test_incoming_start(self):
+ q = "test_incoming_start"
+ session = self.session
+
+ session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+ session.message_subscribe(queue=q, destination="msgs")
+ messages = session.incoming("msgs")
+ assert messages.destination == "msgs"
+
+ dp = session.delivery_properties(routing_key=q)
+ session.message_transfer(message=Message(dp, "test"))
+
+ messages.start()
+ msg = messages.get()
+ assert msg.body == "test"
+
+ def test_ttl(self):
+ q = "test_ttl"
+ session = self.session
+
+ session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+
+ dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second
+ session.message_transfer(message=Message(dp, "first"))
+
+ dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes
+ session.message_transfer(message=Message(dp, "second"))
+
+ d = "msgs"
+ session.message_subscribe(queue=q, destination=d)
+ messages = session.incoming(d)
+ sleep(1)
+ session.message_flow(unit = session.credit_unit.message, value=2, destination=d)
+ session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFFL, destination=d)
+ assert messages.get(timeout=1).body == "second"
+ self.assertEmpty(messages)
+
+
+ def assertDataEquals(self, session, msg, expected):
+ self.assertEquals(expected, msg.body)
+
+ def assertEmpty(self, queue):
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Queue not empty, contains: " + extra.body)
+ except Empty: None
+
+class SizelessContent(Content):
+
+ def size(self):
+ return None
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py b/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py
new file mode 100644
index 0000000000..e9cf9b7caa
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.datatypes import Message, RangedSet
+#from qpid.testlib import testrunner, TestBase010
+from qpid.testlib import TestBase010
+
+class PersistenceTests(TestBase010):
+ def test_delete_queue_after_publish(self):
+ session = self.session
+ session.auto_sync = False
+
+ #create queue
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ for i in range(1, 10):
+ dp = session.delivery_properties(routing_key="q", delivery_mode=2)
+ session.message_transfer(message=Message(dp, "my-message"))
+
+ session.auto_sync = True
+ #explicitly delete queue
+ session.queue_delete(queue = "q")
+
+ def test_ack_message_from_deleted_queue(self):
+ session = self.session
+ session.auto_sync = False
+
+ #create queue
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ dp = session.delivery_properties(routing_key="q", delivery_mode=2)
+ session.message_transfer(message=Message(dp, "my-message"))
+
+ #create consumer
+ session.message_subscribe(queue = "q", destination = "a", accept_mode = 1, acquire_mode=0)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ queue = session.incoming("a")
+
+ #consume the message, cancel subscription (triggering auto-delete), then ack it
+ msg = queue.get(timeout = 5)
+ session.message_cancel(destination = "a")
+ session.message_accept(RangedSet(msg.id))
+
+ def test_queue_deletion(self):
+ session = self.session
+ session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
+ dp = session.delivery_properties(routing_key="xyz", delivery_mode=2)
+ session.message_transfer(destination="amq.topic", message=Message(dp, "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/query.py b/qpid/tests/src/py/qpid_tests/broker_0_10/query.py
new file mode 100644
index 0000000000..d57e964982
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/query.py
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase010
+
+class QueryTests(TestBase010):
+ """Tests for various query methods"""
+
+ def test_queue_query(self):
+ session = self.session
+ session.queue_declare(queue="my-queue", exclusive=True)
+ result = session.queue_query(queue="my-queue")
+ self.assertEqual("my-queue", result.queue)
+
+ def test_queue_query_unknown(self):
+ session = self.session
+ result = session.queue_query(queue="I don't exist")
+ self.assert_(not result.queue)
+
+ def test_exchange_query(self):
+ """
+ Test that the exchange_query method works as expected
+ """
+ session = self.session
+ #check returned type for the standard exchanges
+ self.assertEqual("direct", session.exchange_query(name="amq.direct").type)
+ self.assertEqual("topic", session.exchange_query(name="amq.topic").type)
+ self.assertEqual("fanout", session.exchange_query(name="amq.fanout").type)
+ self.assertEqual("headers", session.exchange_query(name="amq.match").type)
+ self.assertEqual("direct", session.exchange_query(name="").type)
+ #declare an exchange
+ session.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+ #check that the result of a query is as expected
+ response = session.exchange_query(name="my-test-exchange")
+ self.assertEqual("direct", response.type)
+ self.assert_(not response.durable)
+ self.assert_(not response.not_found)
+ #delete the exchange
+ session.exchange_delete(exchange="my-test-exchange")
+ #check that the query now reports not-found
+ self.assert_(session.exchange_query(name="my-test-exchange").not_found)
+
+ def test_exchange_bound_direct(self):
+ """
+ Test that the exchange_bound method works as expected with the direct exchange
+ """
+ self.exchange_bound_with_key("amq.direct")
+
+ def test_exchange_bound_topic(self):
+ """
+ Test that the exchange_bound method works as expected with the direct exchange
+ """
+ self.exchange_bound_with_key("amq.topic")
+
+ def exchange_bound_with_key(self, exchange_name):
+ session = self.session
+ #setup: create two queues
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+
+ session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key")
+
+ # test detection of any binding to specific queue
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = session.exchange_bound(exchange=exchange_name, binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.key_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assert_(not response.key_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test matched queue, unmatched binding
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test unmatched queue, matched binding
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assert_(not response.key_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+ #test exchange found, queue not found
+ response = session.exchange_bound(exchange=exchange_name, queue="unknown-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(True, response.queue_not_found)
+
+ #test exchange not found, queue found
+ response = session.exchange_bound(exchange="unknown-exchange", queue="used-queue")
+ self.assertEqual(True, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+
+ #test not exchange found, queue not found
+ response = session.exchange_bound(exchange="unknown-exchange", queue="unknown-queue")
+ self.assertEqual(True, response.exchange_not_found)
+ self.assertEqual(True, response.queue_not_found)
+
+
+ def test_exchange_bound_fanout(self):
+ """
+ Test that the exchange_bound method works as expected with fanout exchange
+ """
+ session = self.session
+ #setup
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="amq.fanout", queue="used-queue")
+
+ # test detection of any binding to specific queue
+ response = session.exchange_bound(exchange="amq.fanout", queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = session.exchange_bound(exchange="amq.fanout", queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, session.exchange_bound(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
+
+ def test_exchange_bound_header(self):
+ """
+ Test that the exchange_bound method works as expected with headers exchanges
+ """
+ session = self.session
+ #setup
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+ # test detection of any binding to specific queue
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.args_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assert_(not response.args_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test matched queue, unmatched binding
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test unmatched queue, matched binding
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assert_(not response.args_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, session.exchange_bound(exchange="amq.match", queue="unknown-queue").queue_not_found)
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py b/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py
new file mode 100644
index 0000000000..eb38965190
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py
@@ -0,0 +1,366 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
+from qpid.session import SessionException
+
+class QueueTests(TestBase010):
+ """Tests for 'methods' on the amqp queue 'class'"""
+
+ def test_purge(self):
+ """
+ Test that the purge method removes messages from the queue
+ """
+ session = self.session
+ #setup, declare a queue and add some messages to it:
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three"))
+
+ #check that the queue now reports 3 messages:
+ session.queue_declare(queue="test-queue")
+ reply = session.queue_query(queue="test-queue")
+ self.assertEqual(3, reply.message_count)
+
+ #now do the purge, then test that three messages are purged and the count drops to 0
+ session.queue_purge(queue="test-queue");
+ reply = session.queue_query(queue="test-queue")
+ self.assertEqual(0, reply.message_count)
+
+ #send a further message and consume it, ensuring that the other messages are really gone
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four"))
+ session.message_subscribe(queue="test-queue", destination="tag")
+ session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("tag")
+ msg = queue.get(timeout=1)
+ self.assertEqual("four", msg.body)
+
+ def test_purge_queue_exists(self):
+ """
+ Test that the correct exception is thrown is no queue exists
+ for the name specified in purge
+ """
+ session = self.session
+ try:
+ #queue specified but doesn't exist:
+ session.queue_purge(queue="invalid-queue")
+ self.fail("Expected failure when purging non-existent queue")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code) #not-found
+
+ def test_purge_empty_name(self):
+ """
+ Test that the correct exception is thrown is no queue name
+ is specified for purge
+ """
+ session = self.session
+ try:
+ #queue not specified and none previously declared for channel:
+ session.queue_purge()
+ self.fail("Expected failure when purging unspecified queue")
+ except SessionException, e:
+ self.assertEquals(531, e.args[0].error_code) #illegal-argument
+
+ def test_declare_exclusive(self):
+ """
+ Test that the exclusive field is honoured in queue.declare
+ """
+ # TestBase.setUp has already opened session(1)
+ s1 = self.session
+ # Here we open a second separate connection:
+ s2 = self.conn.session("other")
+
+ #declare an exclusive queue:
+ s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ try:
+ #other connection should not be allowed to declare this:
+ s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ self.fail("Expected second exclusive queue_declare to raise a channel exception")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ s3 = self.conn.session("subscriber")
+ try:
+ #other connection should not be allowed to declare this:
+ s3.message_subscribe(queue="exclusive-queue")
+ self.fail("Expected message_subscribe on an exclusive queue to raise a channel exception")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ s4 = self.conn.session("deleter")
+ try:
+ #other connection should not be allowed to declare this:
+ s4.queue_delete(queue="exclusive-queue")
+ self.fail("Expected queue_delete on an exclusive queue to raise a channel exception")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+
+ def test_declare_passive(self):
+ """
+ Test that the passive field is honoured in queue.declare
+ """
+ session = self.session
+ #declare an exclusive queue:
+ session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="passive-queue-1", passive=True)
+ try:
+ #other connection should not be allowed to declare this:
+ session.queue_declare(queue="passive-queue-2", passive=True)
+ self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code) #not-found
+
+
+ def test_bind(self):
+ """
+ Test various permutations of the queue.bind method
+ """
+ session = self.session
+ session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+
+ #straightforward case, both exchange & queue exist so no errors expected:
+ session.exchange_bind(queue="queue-1", exchange="amq.direct", binding_key="key1")
+
+ #use the queue name where the routing key is not specified:
+ session.exchange_bind(queue="queue-1", exchange="amq.direct")
+
+ #try and bind to non-existant exchange
+ try:
+ session.exchange_bind(queue="queue-1", exchange="an-invalid-exchange", binding_key="key1")
+ self.fail("Expected bind to non-existant exchange to fail")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
+ def test_bind_queue_existence(self):
+ session = self.session
+ #try and bind non-existant queue:
+ try:
+ session.exchange_bind(queue="queue-2", exchange="amq.direct", binding_key="key1")
+ self.fail("Expected bind of non-existant queue to fail")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_unbind_direct(self):
+ self.unbind_test(exchange="amq.direct", routing_key="key")
+
+ def test_unbind_topic(self):
+ self.unbind_test(exchange="amq.topic", routing_key="key")
+
+ def test_unbind_fanout(self):
+ self.unbind_test(exchange="amq.fanout")
+
+ def test_unbind_headers(self):
+ self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
+
+ def unbind_test(self, exchange, routing_key="", args=None, headers=None):
+ #bind two queues and consume from them
+ session = self.session
+
+ session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
+
+ session.message_subscribe(queue="queue-1", destination="queue-1")
+ session.message_flow(destination="queue-1", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="queue-1", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ session.message_subscribe(queue="queue-2", destination="queue-2")
+ session.message_flow(destination="queue-2", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="queue-2", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ queue1 = session.incoming("queue-1")
+ queue2 = session.incoming("queue-2")
+
+ session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args)
+ session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args)
+
+ dp = session.delivery_properties(routing_key=routing_key)
+ if (headers):
+ mp = session.message_properties(application_headers=headers)
+ msg1 = Message(dp, mp, "one")
+ msg2 = Message(dp, mp, "two")
+ else:
+ msg1 = Message(dp, "one")
+ msg2 = Message(dp, "two")
+
+ #send a message that will match both bindings
+ session.message_transfer(destination=exchange, message=msg1)
+
+ #unbind first queue
+ session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key)
+
+ #send another message
+ session.message_transfer(destination=exchange, message=msg2)
+
+ #check one queue has both messages and the other has only one
+ self.assertEquals("one", queue1.get(timeout=1).body)
+ try:
+ msg = queue1.get(timeout=1)
+ self.fail("Got extra message: %s" % msg.body)
+ except Empty: pass
+
+ self.assertEquals("one", queue2.get(timeout=1).body)
+ self.assertEquals("two", queue2.get(timeout=1).body)
+ try:
+ msg = queue2.get(timeout=1)
+ self.fail("Got extra message: " + msg)
+ except Empty: pass
+
+
+ def test_delete_simple(self):
+ """
+ Test core queue deletion behaviour
+ """
+ session = self.session
+
+ #straight-forward case:
+ session.queue_declare(queue="delete-me")
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c"))
+ session.queue_delete(queue="delete-me")
+ #check that it has gone by declaring passively
+ try:
+ session.queue_declare(queue="delete-me", passive=True)
+ self.fail("Queue has not been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_delete_queue_exists(self):
+ """
+ Test core queue deletion behaviour
+ """
+ #check attempted deletion of non-existant queue is handled correctly:
+ session = self.session
+ try:
+ session.queue_delete(queue="i-dont-exist", if_empty=True)
+ self.fail("Expected delete of non-existant queue to fail")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
+
+ def test_delete_ifempty(self):
+ """
+ Test that if_empty field of queue_delete is honoured
+ """
+ session = self.session
+
+ #create a queue and add a message to it (use default binding):
+ session.queue_declare(queue="delete-me-2")
+ session.queue_declare(queue="delete-me-2", passive=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message"))
+
+ #try to delete, but only if empty:
+ try:
+ session.queue_delete(queue="delete-me-2", if_empty=True)
+ self.fail("Expected delete if_empty to fail for non-empty queue")
+ except SessionException, e:
+ self.assertEquals(406, e.args[0].error_code)
+
+ #need new session now:
+ session = self.conn.session("replacement", 2)
+
+ #empty queue:
+ session.message_subscribe(destination="consumer_tag", queue="delete-me-2")
+ session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("consumer_tag")
+ msg = queue.get(timeout=1)
+ self.assertEqual("message", msg.body)
+ session.message_cancel(destination="consumer_tag")
+
+ #retry deletion on empty queue:
+ session.queue_delete(queue="delete-me-2", if_empty=True)
+
+ #check that it has gone by declaring passively:
+ try:
+ session.queue_declare(queue="delete-me-2", passive=True)
+ self.fail("Queue has not been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_delete_ifunused(self):
+ """
+ Test that if_unused field of queue_delete is honoured
+ """
+ session = self.session
+
+ #create a queue and register a consumer:
+ session.queue_declare(queue="delete-me-3")
+ session.queue_declare(queue="delete-me-3", passive=True)
+ session.message_subscribe(destination="consumer_tag", queue="delete-me-3")
+
+ #need new session now:
+ session2 = self.conn.session("replacement", 2)
+
+ #try to delete, but only if empty:
+ try:
+ session2.queue_delete(queue="delete-me-3", if_unused=True)
+ self.fail("Expected delete if_unused to fail for queue with existing consumer")
+ except SessionException, e:
+ self.assertEquals(406, e.args[0].error_code)
+
+ session.message_cancel(destination="consumer_tag")
+ session.queue_delete(queue="delete-me-3", if_unused=True)
+ #check that it has gone by declaring passively:
+ try:
+ session.queue_declare(queue="delete-me-3", passive=True)
+ self.fail("Queue has not been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
+ def test_autodelete_shared(self):
+ """
+ Test auto-deletion (of non-exclusive queues)
+ """
+ session = self.session
+ session2 =self.conn.session("other", 1)
+
+ session.queue_declare(queue="auto-delete-me", auto_delete=True)
+
+ #consume from both sessions
+ tag = "my-tag"
+ session.message_subscribe(queue="auto-delete-me", destination=tag)
+ session2.message_subscribe(queue="auto-delete-me", destination=tag)
+
+ #implicit cancel
+ session2.close()
+
+ #check it is still there
+ session.queue_declare(queue="auto-delete-me", passive=True)
+
+ #explicit cancel => queue is now unused again:
+ session.message_cancel(destination=tag)
+
+ #NOTE: this assumes there is no timeout in use
+
+ #check that it has gone by declaring it passively
+ try:
+ session.queue_declare(queue="auto-delete-me", passive=True)
+ self.fail("Expected queue to have been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py b/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py
new file mode 100644
index 0000000000..8cdc539a08
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py
@@ -0,0 +1,265 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+
+class TxTests(TestBase010):
+ """
+ Tests for 'methods' on the amqp tx 'class'
+ """
+
+ def test_commit(self):
+ """
+ Test that commited publishes are delivered and commited acks are not re-delivered
+ """
+ session = self.session
+
+ #declare queues and create subscribers in the checking session
+ #to ensure that the queues are not auto-deleted too early:
+ self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"])
+ session.message_subscribe(queue="tx-commit-a", destination="qa")
+ session.message_subscribe(queue="tx-commit-b", destination="qb")
+ session.message_subscribe(queue="tx-commit-c", destination="qc")
+
+ #use a separate session for actual work
+ session2 = self.conn.session("worker", 2)
+ self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ session2.tx_commit()
+ session2.close()
+
+ session.tx_select()
+
+ self.enable_flow("qa")
+ queue_a = session.incoming("qa")
+
+ self.enable_flow("qb")
+ queue_b = session.incoming("qb")
+
+ self.enable_flow("qc")
+ queue_c = session.incoming("qc")
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("TxMessage %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("TxMessage 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("TxMessage 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def test_auto_rollback(self):
+ """
+ Test that a session closed with an open transaction is effectively rolled back
+ """
+ session = self.session
+ self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"])
+ session.message_subscribe(queue="tx-autorollback-a", destination="qa")
+ session.message_subscribe(queue="tx-autorollback-b", destination="qb")
+ session.message_subscribe(queue="tx-autorollback-c", destination="qc")
+
+ session2 = self.conn.session("worker", 2)
+ queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ session2.close()
+
+ session.tx_select()
+
+ self.enable_flow("qa")
+ queue_a = session.incoming("qa")
+
+ self.enable_flow("qb")
+ queue_b = session.incoming("qb")
+
+ self.enable_flow("qc")
+ queue_c = session.incoming("qc")
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def test_rollback(self):
+ """
+ Test that rolled back publishes are not delivered and rolled back acks are re-delivered
+ """
+ session = self.session
+ queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ session.tx_rollback()
+
+ #need to release messages to get them redelivered now:
+ session.message_release(consumed)
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def perform_txn_work(self, session, name_a, name_b, name_c):
+ """
+ Utility method that does some setup and some work under a transaction. Used for testing both
+ commit and rollback
+ """
+ #setup:
+ self.declare_queues([name_a, name_b, name_c])
+
+ key = "my_key_" + name_b
+ topic = "my_topic_" + name_c
+
+ session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
+ session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)
+
+ dp = session.delivery_properties(routing_key=name_a)
+ for i in range(1, 5):
+ mp = session.message_properties(message_id="msg%d" % i)
+ session.message_transfer(message=Message(dp, mp, "Message %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))
+
+ dp = session.delivery_properties(routing_key=topic)
+ mp = session.message_properties(message_id="msg7")
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))
+
+ session.tx_select()
+
+ #consume and ack messages
+ acked = RangedSet()
+ self.subscribe(session, queue=name_a, destination="sub_a")
+ queue_a = session.incoming("sub_a")
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ acked.add(msg.id)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ self.subscribe(session, queue=name_b, destination="sub_b")
+ queue_b = session.incoming("sub_b")
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ acked.add(msg.id)
+
+ sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
+ queue_c = session.incoming("sub_c")
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ acked.add(msg.id)
+
+ session.message_accept(acked)
+
+ dp = session.delivery_properties(routing_key=topic)
+ #publish messages
+ for i in range(1, 5):
+ mp = session.message_properties(message_id="tx-msg%d" % i)
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="tx-msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))
+
+ dp = session.delivery_properties(routing_key=name_a)
+ mp = session.message_properties(message_id="tx-msg7")
+ session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
+ return queue_a, queue_b, queue_c, acked
+
+ def declare_queues(self, names, session=None):
+ session = session or self.session
+ for n in names:
+ session.queue_declare(queue=n, auto_delete=True)
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination=consumer_tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ def enable_flow(self, tag, session=None):
+ session = session or self.session
+ session.message_flow(destination=tag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination=tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ def complete(self, session, msg):
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+ session.channel.session_completed(session.receiver._completed)
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py
new file mode 100644
index 0000000000..526f2452f8
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import basic, broker, example, exchange, queue, testlib, tx
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py b/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py
new file mode 100644
index 0000000000..d5837fc19c
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class BasicTests(TestBase):
+ """Tests for 'methods' on the amqp basic 'class'"""
+
+ def test_consume_no_local(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare two queues:
+ channel.queue_declare(queue="test-queue-1a", exclusive=True)
+ channel.queue_declare(queue="test-queue-1b", exclusive=True)
+ #establish two consumers one of which excludes delivery of locally sent messages
+ channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
+ channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
+
+ #send a message
+ channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local"))
+ channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local"))
+
+ #check the queues of the two consumers
+ excluded = self.client.queue("local_excluded")
+ included = self.client.queue("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("consume_no_local", msg.content.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+
+ def test_consume_exclusive(self):
+ """
+ Test that the exclusive flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-2", exclusive=True)
+
+ #check that an exclusive consumer prevents other consumer being created:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
+ try:
+ channel.basic_consume(consumer_tag="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ #open new channel and cleanup last consumer:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #check that an exclusive consumer cannot be created if a consumer already exists:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-2")
+ try:
+ channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ def test_consume_queue_errors(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ channel = self.channel
+ try:
+ #queue specified but doesn't exist:
+ channel.basic_consume(queue="invalid-queue")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.basic_consume(queue="")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-3", exclusive=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+ try:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_cancel(self):
+ """
+ Test compliance of the basic.cancel method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-4", exclusive=True)
+ channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
+ channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
+
+ myqueue = self.client.queue("my-consumer")
+ msg = myqueue.get(timeout=1)
+ self.assertEqual("One", msg.content.body)
+
+ #cancel should stop messages being delivered
+ channel.basic_cancel(consumer_tag="my-consumer")
+ channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
+ try:
+ msg = myqueue.get(timeout=1)
+ self.fail("Got message after cancellation: " + msg)
+ except Empty: None
+
+ #cancellation of non-existant consumers should be handled without error
+ channel.basic_cancel(consumer_tag="my-consumer")
+ channel.basic_cancel(consumer_tag="this-never-existed")
+
+
+ def test_ack(self):
+ """
+ Test basic ack/recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-ack-queue", exclusive=True)
+
+ reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
+ queue = self.client.queue(reply.consumer_tag)
+
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
+ channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+ channel.basic_recover(requeue=False)
+
+ msg3b = queue.get(timeout=1)
+ msg5b = queue.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ def test_recover_requeue(self):
+ """
+ Test requeing on recovery
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-requeue", exclusive=True)
+
+ subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ channel.basic_publish(routing_key="test-requeue", content=Content("One"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
+ channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+ channel.basic_cancel(consumer_tag=subscription.consumer_tag)
+
+ channel.basic_recover(requeue=True)
+
+ subscription2 = channel.basic_consume(queue="test-requeue")
+ queue2 = self.client.queue(subscription2.consumer_tag)
+
+ msg3b = queue2.get(timeout=1)
+ msg5b = queue2.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ self.assertEqual(True, msg3b.redelivered)
+ self.assertEqual(True, msg5b.redelivered)
+
+ try:
+ extra = queue2.get(timeout=1)
+ self.fail("Got unexpected message in second queue: " + extra.content.body)
+ except Empty: None
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+ def test_qos_prefetch_count(self):
+ """
+ Test that the prefetch count specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+ subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ #set prefetch to 5:
+ channel.basic_qos(prefetch_count=5)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
+
+ #only 5 messages should have been delivered:
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+
+ def test_qos_prefetch_size(self):
+ """
+ Test that the prefetch size specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+ subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+ channel.basic_qos(prefetch_size=50)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
+
+ #only 5 messages should have been delivered (i.e. 45 bytes worth):
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #make sure that a single oversized message still gets delivered
+ large = "abcdefghijklmnopqrstuvwxyz"
+ large = large + "-" + large;
+ channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
+ msg = queue.get(timeout=1)
+ self.assertEqual(large, msg.content.body)
+
+ def test_get(self):
+ """
+ Test basic_get method
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-get", exclusive=True)
+
+ #publish some messages (no_ack=True)
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+ #use basic_get to read back the messages, and check that we get an empty at the end
+ for i in range(1, 11):
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_empty")
+
+ #repeat for no_ack=False
+ for i in range(11, 21):
+ channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+ for i in range(11, 21):
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+ if(i == 13):
+ channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
+ if(i in [15, 17, 19]):
+ channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_empty")
+
+ #recover(requeue=True)
+ channel.basic_recover(requeue=True)
+
+ #get the unacked messages again (14, 16, 18, 20)
+ for i in [14, 16, 18, 20]:
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+ channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_empty")
+
+ channel.basic_recover(requeue=True)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_empty")
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py b/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py
new file mode 100644
index 0000000000..7f3fe7530e
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class BrokerTests(TestBase):
+ """Tests for basic Broker functionality"""
+
+ def test_ack_and_no_ack(self):
+ """
+ First, this test tries to receive a message with a no-ack
+ consumer. Second, this test tries to explicitly receive and
+ acknowledge a message with an acknowledging consumer.
+ """
+ ch = self.channel
+ self.queue_declare(ch, queue = "myqueue")
+
+ # No ack consumer
+ ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag
+ body = "test no-ack"
+ ch.basic_publish(routing_key = "myqueue", content = Content(body))
+ msg = self.client.queue(ctag).get(timeout = 5)
+ self.assert_(msg.content.body == body)
+
+ # Acknowledging consumer
+ self.queue_declare(ch, queue = "otherqueue")
+ ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag
+ body = "test ack"
+ ch.basic_publish(routing_key = "otherqueue", content = Content(body))
+ msg = self.client.queue(ctag).get(timeout = 5)
+ ch.basic_ack(delivery_tag = msg.delivery_tag)
+ self.assert_(msg.content.body == body)
+
+ def test_basic_delivery_immediate(self):
+ """
+ Test basic message delivery where consume is issued before publish
+ """
+ channel = self.channel
+ self.exchange_declare(channel, exchange="test-exchange", type="direct")
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+
+ body = "Immediate Delivery"
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True)
+ msg = queue.get(timeout=5)
+ self.assert_(msg.content.body == body)
+
+ # TODO: Ensure we fail if immediate=True and there's no consumer.
+
+
+ def test_basic_delivery_queued(self):
+ """
+ Test basic message delivery where publish is issued before consume
+ (i.e. requires queueing of the message)
+ """
+ channel = self.channel
+ self.exchange_declare(channel, exchange="test-exchange", type="direct")
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ body = "Queued Delivery"
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body))
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=5)
+ self.assert_(msg.content.body == body)
+
+ def test_invalid_channel(self):
+ channel = self.client.channel(200)
+ try:
+ channel.queue_declare(exclusive=True)
+ self.fail("Expected error on queue_declare for invalid channel")
+ except Closed, e:
+ self.assertConnectionException(504, e.args[0])
+
+ def test_closed_channel(self):
+ channel = self.client.channel(200)
+ channel.channel_open()
+ channel.channel_close()
+ try:
+ channel.queue_declare(exclusive=True)
+ self.fail("Expected error on queue_declare for closed channel")
+ except Closed, e:
+ self.assertConnectionException(504, e.args[0])
+
+ def test_channel_flow(self):
+ channel = self.channel
+ channel.queue_declare(queue="flow_test_queue", exclusive=True)
+ ctag = channel.basic_consume(queue="flow_test_queue", no_ack=True).consumer_tag
+ incoming = self.client.queue(ctag)
+
+ channel.channel_flow(active=False)
+ channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz"))
+ try:
+ incoming.get(timeout=1)
+ self.fail("Received message when flow turned off.")
+ except Empty: None
+
+ channel.channel_flow(active=True)
+ msg = incoming.get(timeout=1)
+ self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body)
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/example.py b/qpid/tests/src/py/qpid_tests/broker_0_8/example.py
new file mode 100644
index 0000000000..d82bad1f61
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_8/example.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class ExampleTest (TestBase):
+ """
+ An example Qpid test, illustrating the unittest frameowkr and the
+ python Qpid client. The test class must inherit TestCase. The
+ test code uses the Qpid client to interact with a qpid broker and
+ verify it behaves as expected.
+ """
+
+ def test_example(self):
+ """
+ An example test. Note that test functions must start with 'test_'
+ to be recognized by the test framework.
+ """
+
+ # By inheriting TestBase, self.client is automatically connected
+ # and self.channel is automatically opened as channel(1)
+ # Other channel methods mimic the protocol.
+ channel = self.channel
+
+ # Now we can send regular commands. If you want to see what the method
+ # arguments mean or what other commands are available, you can use the
+ # python builtin help() method. For example:
+ #help(chan)
+ #help(chan.exchange_declare)
+
+ # If you want browse the available protocol methods without being
+ # connected to a live server you can use the amqp-doc utility:
+ #
+ # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+ #
+ # Options:
+ # -e, --regexp use regex instead of glob when matching
+
+ # Now that we know what commands are available we can use them to
+ # interact with the server.
+
+ # Here we use ordinal arguments.
+ self.exchange_declare(channel, 0, "test", "direct")
+
+ # Here we use keyword arguments.
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
+
+ # Call Channel.basic_consume to register as a consumer.
+ # All the protocol methods return a message object. The message object
+ # has fields corresponding to the reply method fields, plus a content
+ # field that is filled if the reply includes content. In this case the
+ # interesting field is the consumer_tag.
+ reply = channel.basic_consume(queue="test-queue")
+
+ # We can use the Client.queue(...) method to access the queue
+ # corresponding to our consumer_tag.
+ queue = self.client.queue(reply.consumer_tag)
+
+ # Now lets publish a message and see if our consumer gets it. To do
+ # this we need to import the Content class.
+ body = "Hello World!"
+ channel.basic_publish(exchange="test",
+ routing_key="key",
+ content=Content(body))
+
+ # Now we'll wait for the message to arrive. We can use the timeout
+ # argument in case the server hangs. By default queue.get() will wait
+ # until a message arrives or the connection to the server dies.
+ msg = queue.get(timeout=10)
+
+ # And check that we got the right response with assertEqual
+ self.assertEqual(body, msg.content.body)
+
+ # Now acknowledge the message.
+ channel.basic_ack(msg.delivery_tag, True)
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py
new file mode 100644
index 0000000000..56d6fa82e4
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py
@@ -0,0 +1,327 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import Queue, logging
+from qpid.testlib import TestBase
+from qpid.content import Content
+from qpid.client import Closed
+
+
+class StandardExchangeVerifier:
+ """Verifies standard exchange behavior.
+
+ Used as base class for classes that test standard exchanges."""
+
+ def verifyDirectExchange(self, ex):
+ """Verify that ex behaves like a direct exchange."""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+ try:
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+ self.fail("Expected Empty exception")
+ except Queue.Empty: None # Expected
+
+ def verifyFanOutExchange(self, ex):
+ """Verify that ex behaves like a fanout exchange."""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex)
+ self.queue_declare(queue="p")
+ self.channel.queue_bind(queue="p", exchange=ex)
+ for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+ def verifyTopicExchange(self, ex):
+ """Verify that ex behaves like a topic exchange"""
+ self.queue_declare(queue="a")
+ self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+ q = self.consume("a")
+ self.assertPublishGet(q, ex, "a.b.x")
+ self.assertPublishGet(q, ex, "a.x.b.x")
+ self.assertPublishGet(q, ex, "a.x.x.b.x")
+ # Shouldn't match
+ self.channel.basic_publish(exchange=ex, routing_key="a.b")
+ self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")
+ self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x")
+ self.channel.basic_publish(exchange=ex, routing_key="a.b")
+ self.assert_(q.empty())
+
+ def verifyHeadersExchange(self, ex):
+ """Verify that ex is a headers exchange"""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+ q = self.consume("q")
+ headers = {"name":"fred", "age":3}
+ self.assertPublishGet(q, exchange=ex, properties={'headers':headers})
+ self.channel.basic_publish(exchange=ex) # No headers, won't deliver
+ self.assertEmpty(q);
+
+
+class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server SHOULD implement these standard exchange types: topic, headers.
+
+ Client attempts to declare an exchange with each of these standard types.
+ """
+
+ def testDirect(self):
+ """Declare and test a direct exchange"""
+ self.exchange_declare(0, exchange="d", type="direct")
+ self.verifyDirectExchange("d")
+
+ def testFanout(self):
+ """Declare and test a fanout exchange"""
+ self.exchange_declare(0, exchange="f", type="fanout")
+ self.verifyFanOutExchange("f")
+
+ def testTopic(self):
+ """Declare and test a topic exchange"""
+ self.exchange_declare(0, exchange="t", type="topic")
+ self.verifyTopicExchange("t")
+
+ def testHeaders(self):
+ """Declare and test a headers exchange"""
+ self.exchange_declare(0, exchange="h", type="headers")
+ self.verifyHeadersExchange("h")
+
+
+class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server MUST, in each virtual host, pre-declare an exchange instance
+ for each standard exchange type that it implements, where the name of the
+ exchange instance is amq. followed by the exchange type name.
+
+ Client creates a temporary queue and attempts to bind to each required
+ exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
+ those types are defined).
+ """
+ def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+ def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+ def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
+
+ def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
+
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server MUST predeclare a direct exchange to act as the default exchange
+ for content Publish methods and for default queue bindings.
+
+ Client checks that the default exchange is active by specifying a queue
+ binding with no exchange name, and publishing a message with a suitable
+ routing key but without specifying the exchange name, then ensuring that
+ the message arrives in the queue correctly.
+ """
+ def testDefaultExchange(self):
+ # Test automatic binding by queue name.
+ self.queue_declare(queue="d")
+ self.assertPublishConsume(queue="d", routing_key="d")
+ # Test explicit bind to default queue
+ self.verifyDirectExchange("")
+
+
+# TODO aconway 2006-09-27: Fill in empty tests:
+
+class DefaultAccessRuleTests(TestBase):
+ """
+ The server MUST NOT allow clients to access the default exchange except
+ by specifying an empty exchange name in the Queue.Bind and content Publish
+ methods.
+ """
+
+class ExtensionsRuleTests(TestBase):
+ """
+ The server MAY implement other exchange types as wanted.
+ """
+
+
+class DeclareMethodMinimumRuleTests(TestBase):
+ """
+ The server SHOULD support a minimum of 16 exchanges per virtual host and
+ ideally, impose no limit except as defined by available resources.
+
+ The client creates as many exchanges as it can until the server reports
+ an error; the number of exchanges successfuly created must be at least
+ sixteen.
+ """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestBase):
+ """
+ The client MUST provide a valid access ticket giving "active" access to
+ the realm in which the exchange exists or will be created, or "passive"
+ access if the if-exists flag is set.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
+ """
+ Exchange names starting with "amq." are reserved for predeclared and
+ standardised exchanges. The client MUST NOT attempt to create an exchange
+ starting with "amq.".
+
+
+ """
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestBase):
+ """
+ Exchanges cannot be redeclared with different types. The client MUST not
+ attempt to redeclare an existing exchange with a different type than used
+ in the original Exchange.Declare method.
+
+
+ """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestBase):
+ """
+ The client MUST NOT attempt to create an exchange with a type that the
+ server does not support.
+
+
+ """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
+ """
+ If set, and the exchange does not already exist, the server MUST raise a
+ channel exception with reply code 404 (not found).
+ """
+ def test(self):
+ try:
+ self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
+ self.fail("Expected 404 for passive declaration of unknown exchange.")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestBase):
+ """
+ The server MUST support both durable and transient exchanges.
+
+
+ """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestBase):
+ """
+ The server MUST ignore the durable field if the exchange already exists.
+
+
+ """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
+ """
+ The server MUST ignore the auto-delete field if the exchange already
+ exists.
+
+
+ """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestBase):
+ """
+ The client MUST provide a valid access ticket giving "active" access
+ rights to the exchange's access realm.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
+ """
+ The client MUST NOT attempt to delete an exchange that does not exist.
+ """
+
+
+class HeadersExchangeTests(TestBase):
+ """
+ Tests for headers exchange functionality.
+ """
+ def setUp(self):
+ TestBase.setUp(self)
+ self.queue_declare(queue="q")
+ self.q = self.consume("q")
+
+ def myAssertPublishGet(self, headers):
+ self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers})
+
+ def myBasicPublish(self, headers):
+ self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers}))
+
+ def testMatchAll(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+
+ # None of these should match
+ self.myBasicPublish({})
+ self.myBasicPublish({"name":"barney"})
+ self.myBasicPublish({"name":10})
+ self.myBasicPublish({"name":"fred", "age":2})
+ self.assertEmpty(self.q)
+
+ def testMatchAny(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred"})
+ self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+ self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+ # Wont match
+ self.myBasicPublish({})
+ self.myBasicPublish({"irrelevant":0})
+ self.assertEmpty(self.q)
+
+
+class MiscellaneousErrorsTests(TestBase):
+ """
+ Test some miscellaneous error conditions
+ """
+ def testTypeNotKnown(self):
+ try:
+ self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+ self.fail("Expected 503 for declaration of unknown exchange type.")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def testDifferentDeclaredType(self):
+ self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+ try:
+ self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+ self.fail("Expected 530 for redeclaration of exchange with different type.")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+ #cleanup
+ other = self.connect()
+ c2 = other.channel(1)
+ c2.channel_open()
+ c2.exchange_delete(exchange="test_different_declared_type_exchange")
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py b/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py
new file mode 100644
index 0000000000..b7a41736ab
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py
@@ -0,0 +1,255 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class QueueTests(TestBase):
+ """Tests for 'methods' on the amqp queue 'class'"""
+
+ def test_purge(self):
+ """
+ Test that the purge method removes messages from the queue
+ """
+ channel = self.channel
+ #setup, declare a queue and add some messages to it:
+ channel.exchange_declare(exchange="test-exchange", type="direct")
+ channel.queue_declare(queue="test-queue", exclusive=True)
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one"))
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two"))
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three"))
+
+ #check that the queue now reports 3 messages:
+ reply = channel.queue_declare(queue="test-queue")
+ self.assertEqual(3, reply.message_count)
+
+ #now do the purge, then test that three messages are purged and the count drops to 0
+ reply = channel.queue_purge(queue="test-queue");
+ self.assertEqual(3, reply.message_count)
+ reply = channel.queue_declare(queue="test-queue")
+ self.assertEqual(0, reply.message_count)
+
+ #send a further message and consume it, ensuring that the other messages are really gone
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four"))
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEqual("four", msg.content.body)
+
+ #check error conditions (use new channels):
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ #queue specified but doesn't exist:
+ channel.queue_purge(queue="invalid-queue")
+ self.fail("Expected failure when purging non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(3)
+ channel.channel_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.queue_purge()
+ self.fail("Expected failure when purging unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ #cleanup
+ other = self.connect()
+ channel = other.channel(1)
+ channel.channel_open()
+ channel.exchange_delete(exchange="test-exchange")
+
+ def test_declare_exclusive(self):
+ """
+ Test that the exclusive field is honoured in queue.declare
+ """
+ # TestBase.setUp has already opened channel(1)
+ c1 = self.channel
+ # Here we open a second separate connection:
+ other = self.connect()
+ c2 = other.channel(1)
+ c2.channel_open()
+
+ #declare an exclusive queue:
+ c1.queue_declare(queue="exclusive-queue", exclusive="True")
+ try:
+ #other connection should not be allowed to declare this:
+ c2.queue_declare(queue="exclusive-queue", exclusive="True")
+ self.fail("Expected second exclusive queue_declare to raise a channel exception")
+ except Closed, e:
+ self.assertChannelException(405, e.args[0])
+
+
+ def test_declare_passive(self):
+ """
+ Test that the passive field is honoured in queue.declare
+ """
+ channel = self.channel
+ #declare an exclusive queue:
+ channel.queue_declare(queue="passive-queue-1", exclusive="True")
+ channel.queue_declare(queue="passive-queue-1", passive="True")
+ try:
+ #other connection should not be allowed to declare this:
+ channel.queue_declare(queue="passive-queue-2", passive="True")
+ self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+ def test_bind(self):
+ """
+ Test various permutations of the queue.bind method
+ """
+ channel = self.channel
+ channel.queue_declare(queue="queue-1", exclusive="True")
+
+ #straightforward case, both exchange & queue exist so no errors expected:
+ channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
+
+ #bind the default queue for the channel (i.e. last one declared):
+ channel.queue_bind(exchange="amq.direct", routing_key="key2")
+
+ #use the queue name where neither routing key nor queue are specified:
+ channel.queue_bind(exchange="amq.direct")
+
+ #try and bind to non-existant exchange
+ try:
+ channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
+ self.fail("Expected bind to non-existant exchange to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ #need to reopen a channel:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #try and bind non-existant queue:
+ try:
+ channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
+ self.fail("Expected bind of non-existant queue to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+ def test_delete_simple(self):
+ """
+ Test basic queue deletion
+ """
+ channel = self.channel
+
+ #straight-forward case:
+ channel.queue_declare(queue="delete-me")
+ channel.basic_publish(routing_key="delete-me", content=Content("a"))
+ channel.basic_publish(routing_key="delete-me", content=Content("b"))
+ channel.basic_publish(routing_key="delete-me", content=Content("c"))
+ reply = channel.queue_delete(queue="delete-me")
+ self.assertEqual(3, reply.message_count)
+ #check that it has gone be declaring passively
+ try:
+ channel.queue_declare(queue="delete-me", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ #check attempted deletion of non-existant queue is handled correctly:
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ channel.queue_delete(queue="i-dont-exist", if_empty="True")
+ self.fail("Expected delete of non-existant queue to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+
+ def test_delete_ifempty(self):
+ """
+ Test that if_empty field of queue_delete is honoured
+ """
+ channel = self.channel
+
+ #create a queue and add a message to it (use default binding):
+ channel.queue_declare(queue="delete-me-2")
+ channel.queue_declare(queue="delete-me-2", passive="True")
+ channel.basic_publish(routing_key="delete-me-2", content=Content("message"))
+
+ #try to delete, but only if empty:
+ try:
+ channel.queue_delete(queue="delete-me-2", if_empty="True")
+ self.fail("Expected delete if_empty to fail for non-empty queue")
+ except Closed, e:
+ self.assertChannelException(406, e.args[0])
+
+ #need new channel now:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #empty queue:
+ reply = channel.basic_consume(queue="delete-me-2", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEqual("message", msg.content.body)
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+
+ #retry deletion on empty queue:
+ channel.queue_delete(queue="delete-me-2", if_empty="True")
+
+ #check that it has gone by declaring passively:
+ try:
+ channel.queue_declare(queue="delete-me-2", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ def test_delete_ifunused(self):
+ """
+ Test that if_unused field of queue_delete is honoured
+ """
+ channel = self.channel
+
+ #create a queue and register a consumer:
+ channel.queue_declare(queue="delete-me-3")
+ channel.queue_declare(queue="delete-me-3", passive="True")
+ reply = channel.basic_consume(queue="delete-me-3", no_ack=True)
+
+ #need new channel now:
+ channel2 = self.client.channel(2)
+ channel2.channel_open()
+ #try to delete, but only if empty:
+ try:
+ channel2.queue_delete(queue="delete-me-3", if_unused="True")
+ self.fail("Expected delete if_unused to fail for queue with existing consumer")
+ except Closed, e:
+ self.assertChannelException(406, e.args[0])
+
+
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+ channel.queue_delete(queue="delete-me-3", if_unused="True")
+ #check that it has gone by declaring passively:
+ try:
+ channel.queue_declare(queue="delete-me-3", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py b/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py
new file mode 100644
index 0000000000..76f7e964a2
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Tests for the testlib itself.
+#
+
+from qpid.content import Content
+from qpid.testlib import TestBase
+from Queue import Empty
+
+import sys
+from traceback import *
+
+def mytrace(frame, event, arg):
+ print_stack(frame);
+ print "===="
+ return mytrace
+
+class TestBaseTest(TestBase):
+ """Verify TestBase functions work as expected"""
+
+ def testAssertEmptyPass(self):
+ """Test assert empty works"""
+ self.queue_declare(queue="empty")
+ q = self.consume("empty")
+ self.assertEmpty(q)
+ try:
+ q.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Empty: None # Ignore
+
+ def testAssertEmptyFail(self):
+ self.queue_declare(queue="full")
+ q = self.consume("full")
+ self.channel.basic_publish(routing_key="full")
+ try:
+ self.assertEmpty(q);
+ self.fail("assertEmpty did not assert on non-empty queue")
+ except AssertionError: None # Ignore
+
+ def testMessageProperties(self):
+ """Verify properties are passed with message"""
+ props={"headers":{"x":1, "y":2}}
+ self.queue_declare(queue="q")
+ q = self.consume("q")
+ self.assertPublishGet(q, routing_key="q", properties=props)
+
+
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py b/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py
new file mode 100644
index 0000000000..9faddb1110
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class TxTests(TestBase):
+ """
+ Tests for 'methods' on the amqp tx 'class'
+ """
+
+ def test_commit(self):
+ """
+ Test that commited publishes are delivered and commited acks are not re-delivered
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ channel.tx_commit()
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("TxMessage %d" % i, msg.content.body)
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("TxMessage 6", msg.content.body)
+
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("TxMessage 7", msg.content.body)
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.basic_ack(delivery_tag=0, multiple=True)
+ channel.tx_commit()
+
+ def test_auto_rollback(self):
+ """
+ Test that a channel closed with an open transaction is effectively rolled back
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ channel.tx_rollback()
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.basic_ack(delivery_tag=0, multiple=True)
+ channel.tx_commit()
+
+ def test_rollback(self):
+ """
+ Test that rolled back publishes are not delivered and rolled back acks are re-delivered
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ channel.tx_rollback()
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.basic_ack(delivery_tag=0, multiple=True)
+ channel.tx_commit()
+
+ def perform_txn_work(self, channel, name_a, name_b, name_c):
+ """
+ Utility method that does some setup and some work under a transaction. Used for testing both
+ commit and rollback
+ """
+ #setup:
+ channel.queue_declare(queue=name_a, exclusive=True)
+ channel.queue_declare(queue=name_b, exclusive=True)
+ channel.queue_declare(queue=name_c, exclusive=True)
+
+ key = "my_key_" + name_b
+ topic = "my_topic_" + name_c
+
+ channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
+ channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
+
+ for i in range(1, 5):
+ channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i))
+
+ channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6"))
+ channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7"))
+
+ channel.tx_select()
+
+ #consume and ack messages
+ sub_a = channel.basic_consume(queue=name_a, no_ack=False)
+ queue_a = self.client.queue(sub_a.consumer_tag)
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ sub_b = channel.basic_consume(queue=name_b, no_ack=False)
+ queue_b = self.client.queue(sub_b.consumer_tag)
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+
+ sub_c = channel.basic_consume(queue=name_c, no_ack=False)
+ queue_c = self.client.queue(sub_c.consumer_tag)
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+
+ #publish messages
+ for i in range(1, 5):
+ channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i))
+
+ channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6"))
+ channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7"))
+
+ return queue_a, queue_b, queue_c
+
+ def test_commit_overlapping_acks(self):
+ """
+ Test that logically 'overlapping' acks do not cause errors on commit
+ """
+ channel = self.channel
+ channel.queue_declare(queue="commit-overlapping", exclusive=True)
+ for i in range(1, 10):
+ channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i))
+
+
+ channel.tx_select()
+
+ sub = channel.basic_consume(queue="commit-overlapping", no_ack=False)
+ queue = self.client.queue(sub.consumer_tag)
+ for i in range(1, 10):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ if i in [3, 6, 10]:
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+
+ channel.tx_commit()
+
+ #check all have been acked:
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py
new file mode 100644
index 0000000000..d9f2ed7dbb
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import query, queue
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/query.py b/qpid/tests/src/py/qpid_tests/broker_0_9/query.py
new file mode 100644
index 0000000000..cb66d079e5
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_9/query.py
@@ -0,0 +1,224 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class QueryTests(TestBase):
+ """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
+
+ def test_exchange_query(self):
+ """
+ Test that the exchange_query method works as expected
+ """
+ channel = self.channel
+ #check returned type for the standard exchanges
+ self.assertEqual("direct", channel.exchange_query(name="amq.direct").type)
+ self.assertEqual("topic", channel.exchange_query(name="amq.topic").type)
+ self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type)
+ self.assertEqual("headers", channel.exchange_query(name="amq.match").type)
+ self.assertEqual("direct", channel.exchange_query(name="").type)
+ #declare an exchange
+ channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+ #check that the result of a query is as expected
+ response = channel.exchange_query(name="my-test-exchange")
+ self.assertEqual("direct", response.type)
+ self.assertEqual(False, response.durable)
+ self.assertEqual(False, response.not_found)
+ #delete the exchange
+ channel.exchange_delete(exchange="my-test-exchange")
+ #check that the query now reports not-found
+ self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
+
+ def test_binding_query_direct(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.direct")
+
+ def test_binding_query_topic(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.topic")
+
+ def binding_query_with_key(self, exchange_name):
+ channel = self.channel
+ #setup: create two queues
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+
+ channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test matched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test unmatched queue, matched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
+
+
+ def test_binding_query_fanout(self):
+ """
+ Test that the binding_query method works as expected with fanout exchange
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_bind(exchange="amq.fanout", queue="used-queue")
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange="amq.fanout", queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
+
+ def test_binding_query_header(self):
+ """
+ Test that the binding_query method works as expected with headers exchanges
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange="amq.match", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test matched queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test unmatched queue, matched binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found)
+
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py b/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
new file mode 100644
index 0000000000..de1153307c
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class QueueTests(TestBase):
+ """Tests for 'methods' on the amqp queue 'class'"""
+
+ def test_unbind_direct(self):
+ self.unbind_test(exchange="amq.direct", routing_key="key")
+
+ def test_unbind_topic(self):
+ self.unbind_test(exchange="amq.topic", routing_key="key")
+
+ def test_unbind_fanout(self):
+ self.unbind_test(exchange="amq.fanout")
+
+ def test_unbind_headers(self):
+ self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
+
+ def unbind_test(self, exchange, routing_key="", args=None, headers={}):
+ #bind two queues and consume from them
+ channel = self.channel
+
+ channel.queue_declare(queue="queue-1", exclusive="True")
+ channel.queue_declare(queue="queue-2", exclusive="True")
+
+ channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True)
+ channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True)
+
+ queue1 = self.client.queue("queue-1")
+ queue2 = self.client.queue("queue-2")
+
+ channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+ channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
+
+ #send a message that will match both bindings
+ channel.basic_publish(exchange=exchange, routing_key=routing_key,
+ content=Content("one", properties={"headers": headers}))
+
+ #unbind first queue
+ channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+
+ #send another message
+ channel.basic_publish(exchange=exchange, routing_key=routing_key,
+ content=Content("two", properties={"headers": headers}))
+
+ #check one queue has both messages and the other has only one
+ self.assertEquals("one", queue1.get(timeout=1).content.body)
+ try:
+ msg = queue1.get(timeout=1)
+ self.fail("Got extra message: %s" % msg.body)
+ except Empty: pass
+
+ self.assertEquals("one", queue2.get(timeout=1).content.body)
+ self.assertEquals("two", queue2.get(timeout=1).content.body)
+ try:
+ msg = queue2.get(timeout=1)
+ self.fail("Got extra message: " + msg)
+ except Empty: pass
+
+ def test_autodelete_shared(self):
+ """
+ Test auto-deletion (of non-exclusive queues)
+ """
+ channel = self.channel
+ other = self.connect()
+ channel2 = other.channel(1)
+ channel2.channel_open()
+
+ channel.queue_declare(queue="auto-delete-me", auto_delete=True)
+
+ #consume from both channels
+ reply = channel.basic_consume(queue="auto-delete-me", no_ack=True)
+ channel2.basic_consume(queue="auto-delete-me", no_ack=True)
+
+ #implicit cancel
+ channel2.channel_close()
+
+ #check it is still there
+ channel.queue_declare(queue="auto-delete-me", passive=True)
+
+ #explicit cancel => queue is now unused again:
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+
+ #NOTE: this assumes there is no timeout in use
+
+ #check that it has gone be declaring passively
+ try:
+ channel.queue_declare(queue="auto-delete-me", passive=True)
+ self.fail("Expected queue to have been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])