summaryrefslogtreecommitdiff
path: root/qpid/python/tests_0-10/queue.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-05 15:08:44 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-05 15:08:44 +0000
commit3df1424a43244ee1c347ff9163650753601c6893 (patch)
tree333c4228d660a6d3d175ddfe76403f1f37e75af1 /qpid/python/tests_0-10/queue.py
parent5b4dc91393493685c6688d0b69334333413616fe (diff)
downloadqpid-python-3df1424a43244ee1c347ff9163650753601c6893.tar.gz
moved protocol tests from qpid/python to qpid/tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@906961 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/tests_0-10/queue.py')
-rw-r--r--qpid/python/tests_0-10/queue.py366
1 files changed, 0 insertions, 366 deletions
diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py
deleted file mode 100644
index eb38965190..0000000000
--- a/qpid/python/tests_0-10/queue.py
+++ /dev/null
@@ -1,366 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.testlib import TestBase010
-from qpid.datatypes import Message
-from qpid.session import SessionException
-
-class QueueTests(TestBase010):
- """Tests for 'methods' on the amqp queue 'class'"""
-
- def test_purge(self):
- """
- Test that the purge method removes messages from the queue
- """
- session = self.session
- #setup, declare a queue and add some messages to it:
- session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
- session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one"))
- session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two"))
- session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three"))
-
- #check that the queue now reports 3 messages:
- session.queue_declare(queue="test-queue")
- reply = session.queue_query(queue="test-queue")
- self.assertEqual(3, reply.message_count)
-
- #now do the purge, then test that three messages are purged and the count drops to 0
- session.queue_purge(queue="test-queue");
- reply = session.queue_query(queue="test-queue")
- self.assertEqual(0, reply.message_count)
-
- #send a further message and consume it, ensuring that the other messages are really gone
- session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four"))
- session.message_subscribe(queue="test-queue", destination="tag")
- session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- queue = session.incoming("tag")
- msg = queue.get(timeout=1)
- self.assertEqual("four", msg.body)
-
- def test_purge_queue_exists(self):
- """
- Test that the correct exception is thrown is no queue exists
- for the name specified in purge
- """
- session = self.session
- try:
- #queue specified but doesn't exist:
- session.queue_purge(queue="invalid-queue")
- self.fail("Expected failure when purging non-existent queue")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code) #not-found
-
- def test_purge_empty_name(self):
- """
- Test that the correct exception is thrown is no queue name
- is specified for purge
- """
- session = self.session
- try:
- #queue not specified and none previously declared for channel:
- session.queue_purge()
- self.fail("Expected failure when purging unspecified queue")
- except SessionException, e:
- self.assertEquals(531, e.args[0].error_code) #illegal-argument
-
- def test_declare_exclusive(self):
- """
- Test that the exclusive field is honoured in queue.declare
- """
- # TestBase.setUp has already opened session(1)
- s1 = self.session
- # Here we open a second separate connection:
- s2 = self.conn.session("other")
-
- #declare an exclusive queue:
- s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
- try:
- #other connection should not be allowed to declare this:
- s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
- self.fail("Expected second exclusive queue_declare to raise a channel exception")
- except SessionException, e:
- self.assertEquals(405, e.args[0].error_code)
-
- s3 = self.conn.session("subscriber")
- try:
- #other connection should not be allowed to declare this:
- s3.message_subscribe(queue="exclusive-queue")
- self.fail("Expected message_subscribe on an exclusive queue to raise a channel exception")
- except SessionException, e:
- self.assertEquals(405, e.args[0].error_code)
-
- s4 = self.conn.session("deleter")
- try:
- #other connection should not be allowed to declare this:
- s4.queue_delete(queue="exclusive-queue")
- self.fail("Expected queue_delete on an exclusive queue to raise a channel exception")
- except SessionException, e:
- self.assertEquals(405, e.args[0].error_code)
-
-
- def test_declare_passive(self):
- """
- Test that the passive field is honoured in queue.declare
- """
- session = self.session
- #declare an exclusive queue:
- session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
- session.queue_declare(queue="passive-queue-1", passive=True)
- try:
- #other connection should not be allowed to declare this:
- session.queue_declare(queue="passive-queue-2", passive=True)
- self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code) #not-found
-
-
- def test_bind(self):
- """
- Test various permutations of the queue.bind method
- """
- session = self.session
- session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
-
- #straightforward case, both exchange & queue exist so no errors expected:
- session.exchange_bind(queue="queue-1", exchange="amq.direct", binding_key="key1")
-
- #use the queue name where the routing key is not specified:
- session.exchange_bind(queue="queue-1", exchange="amq.direct")
-
- #try and bind to non-existant exchange
- try:
- session.exchange_bind(queue="queue-1", exchange="an-invalid-exchange", binding_key="key1")
- self.fail("Expected bind to non-existant exchange to fail")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
-
- def test_bind_queue_existence(self):
- session = self.session
- #try and bind non-existant queue:
- try:
- session.exchange_bind(queue="queue-2", exchange="amq.direct", binding_key="key1")
- self.fail("Expected bind of non-existant queue to fail")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
- def test_unbind_direct(self):
- self.unbind_test(exchange="amq.direct", routing_key="key")
-
- def test_unbind_topic(self):
- self.unbind_test(exchange="amq.topic", routing_key="key")
-
- def test_unbind_fanout(self):
- self.unbind_test(exchange="amq.fanout")
-
- def test_unbind_headers(self):
- self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
-
- def unbind_test(self, exchange, routing_key="", args=None, headers=None):
- #bind two queues and consume from them
- session = self.session
-
- session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
- session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
-
- session.message_subscribe(queue="queue-1", destination="queue-1")
- session.message_flow(destination="queue-1", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="queue-1", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- session.message_subscribe(queue="queue-2", destination="queue-2")
- session.message_flow(destination="queue-2", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="queue-2", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
-
- queue1 = session.incoming("queue-1")
- queue2 = session.incoming("queue-2")
-
- session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args)
- session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args)
-
- dp = session.delivery_properties(routing_key=routing_key)
- if (headers):
- mp = session.message_properties(application_headers=headers)
- msg1 = Message(dp, mp, "one")
- msg2 = Message(dp, mp, "two")
- else:
- msg1 = Message(dp, "one")
- msg2 = Message(dp, "two")
-
- #send a message that will match both bindings
- session.message_transfer(destination=exchange, message=msg1)
-
- #unbind first queue
- session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key)
-
- #send another message
- session.message_transfer(destination=exchange, message=msg2)
-
- #check one queue has both messages and the other has only one
- self.assertEquals("one", queue1.get(timeout=1).body)
- try:
- msg = queue1.get(timeout=1)
- self.fail("Got extra message: %s" % msg.body)
- except Empty: pass
-
- self.assertEquals("one", queue2.get(timeout=1).body)
- self.assertEquals("two", queue2.get(timeout=1).body)
- try:
- msg = queue2.get(timeout=1)
- self.fail("Got extra message: " + msg)
- except Empty: pass
-
-
- def test_delete_simple(self):
- """
- Test core queue deletion behaviour
- """
- session = self.session
-
- #straight-forward case:
- session.queue_declare(queue="delete-me")
- session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a"))
- session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b"))
- session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c"))
- session.queue_delete(queue="delete-me")
- #check that it has gone by declaring passively
- try:
- session.queue_declare(queue="delete-me", passive=True)
- self.fail("Queue has not been deleted")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
- def test_delete_queue_exists(self):
- """
- Test core queue deletion behaviour
- """
- #check attempted deletion of non-existant queue is handled correctly:
- session = self.session
- try:
- session.queue_delete(queue="i-dont-exist", if_empty=True)
- self.fail("Expected delete of non-existant queue to fail")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
-
-
- def test_delete_ifempty(self):
- """
- Test that if_empty field of queue_delete is honoured
- """
- session = self.session
-
- #create a queue and add a message to it (use default binding):
- session.queue_declare(queue="delete-me-2")
- session.queue_declare(queue="delete-me-2", passive=True)
- session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message"))
-
- #try to delete, but only if empty:
- try:
- session.queue_delete(queue="delete-me-2", if_empty=True)
- self.fail("Expected delete if_empty to fail for non-empty queue")
- except SessionException, e:
- self.assertEquals(406, e.args[0].error_code)
-
- #need new session now:
- session = self.conn.session("replacement", 2)
-
- #empty queue:
- session.message_subscribe(destination="consumer_tag", queue="delete-me-2")
- session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- queue = session.incoming("consumer_tag")
- msg = queue.get(timeout=1)
- self.assertEqual("message", msg.body)
- session.message_cancel(destination="consumer_tag")
-
- #retry deletion on empty queue:
- session.queue_delete(queue="delete-me-2", if_empty=True)
-
- #check that it has gone by declaring passively:
- try:
- session.queue_declare(queue="delete-me-2", passive=True)
- self.fail("Queue has not been deleted")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
- def test_delete_ifunused(self):
- """
- Test that if_unused field of queue_delete is honoured
- """
- session = self.session
-
- #create a queue and register a consumer:
- session.queue_declare(queue="delete-me-3")
- session.queue_declare(queue="delete-me-3", passive=True)
- session.message_subscribe(destination="consumer_tag", queue="delete-me-3")
-
- #need new session now:
- session2 = self.conn.session("replacement", 2)
-
- #try to delete, but only if empty:
- try:
- session2.queue_delete(queue="delete-me-3", if_unused=True)
- self.fail("Expected delete if_unused to fail for queue with existing consumer")
- except SessionException, e:
- self.assertEquals(406, e.args[0].error_code)
-
- session.message_cancel(destination="consumer_tag")
- session.queue_delete(queue="delete-me-3", if_unused=True)
- #check that it has gone by declaring passively:
- try:
- session.queue_declare(queue="delete-me-3", passive=True)
- self.fail("Queue has not been deleted")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
-
- def test_autodelete_shared(self):
- """
- Test auto-deletion (of non-exclusive queues)
- """
- session = self.session
- session2 =self.conn.session("other", 1)
-
- session.queue_declare(queue="auto-delete-me", auto_delete=True)
-
- #consume from both sessions
- tag = "my-tag"
- session.message_subscribe(queue="auto-delete-me", destination=tag)
- session2.message_subscribe(queue="auto-delete-me", destination=tag)
-
- #implicit cancel
- session2.close()
-
- #check it is still there
- session.queue_declare(queue="auto-delete-me", passive=True)
-
- #explicit cancel => queue is now unused again:
- session.message_cancel(destination=tag)
-
- #NOTE: this assumes there is no timeout in use
-
- #check that it has gone by declaring it passively
- try:
- session.queue_declare(queue="auto-delete-me", passive=True)
- self.fail("Expected queue to have been deleted")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
-