summaryrefslogtreecommitdiff
path: root/python/tests_0-9
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-05 15:08:44 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-05 15:08:44 +0000
commita2fdc40ae50f95be3f2a96fa7f09a85f068fc424 (patch)
treed8ba40265471e7586fdb615e6dd7461b06ef5a67 /python/tests_0-9
parentcbb046d8b58b671405ce4fc9f9d175978e0f749d (diff)
downloadqpid-python-a2fdc40ae50f95be3f2a96fa7f09a85f068fc424.tar.gz
moved protocol tests from qpid/python to qpid/tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@906961 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-9')
-rw-r--r--python/tests_0-9/__init__.py22
-rw-r--r--python/tests_0-9/query.py224
-rw-r--r--python/tests_0-9/queue.py111
3 files changed, 0 insertions, 357 deletions
diff --git a/python/tests_0-9/__init__.py b/python/tests_0-9/__init__.py
deleted file mode 100644
index d9f2ed7dbb..0000000000
--- a/python/tests_0-9/__init__.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import query, queue
diff --git a/python/tests_0-9/query.py b/python/tests_0-9/query.py
deleted file mode 100644
index cb66d079e5..0000000000
--- a/python/tests_0-9/query.py
+++ /dev/null
@@ -1,224 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import TestBase
-
-class QueryTests(TestBase):
- """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
-
- def test_exchange_query(self):
- """
- Test that the exchange_query method works as expected
- """
- channel = self.channel
- #check returned type for the standard exchanges
- self.assertEqual("direct", channel.exchange_query(name="amq.direct").type)
- self.assertEqual("topic", channel.exchange_query(name="amq.topic").type)
- self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type)
- self.assertEqual("headers", channel.exchange_query(name="amq.match").type)
- self.assertEqual("direct", channel.exchange_query(name="").type)
- #declare an exchange
- channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
- #check that the result of a query is as expected
- response = channel.exchange_query(name="my-test-exchange")
- self.assertEqual("direct", response.type)
- self.assertEqual(False, response.durable)
- self.assertEqual(False, response.not_found)
- #delete the exchange
- channel.exchange_delete(exchange="my-test-exchange")
- #check that the query now reports not-found
- self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
-
- def test_binding_query_direct(self):
- """
- Test that the binding_query method works as expected with the direct exchange
- """
- self.binding_query_with_key("amq.direct")
-
- def test_binding_query_topic(self):
- """
- Test that the binding_query method works as expected with the direct exchange
- """
- self.binding_query_with_key("amq.topic")
-
- def binding_query_with_key(self, exchange_name):
- channel = self.channel
- #setup: create two queues
- channel.queue_declare(queue="used-queue", exclusive=True)
- channel.queue_declare(queue="unused-queue", exclusive=True)
-
- channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
-
- # test detection of any binding to specific queue
- response = channel.binding_query(exchange=exchange_name, queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
-
- # test detection of specific binding to any queue
- response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.key_not_matched)
-
- # test detection of specific binding to specific queue
- response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(False, response.key_not_matched)
-
- # test unmatched queue, unspecified binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
-
- # test unspecified queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.key_not_matched)
-
- # test matched queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(True, response.key_not_matched)
-
- # test unmatched queue, matched binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(False, response.key_not_matched)
-
- # test unmatched queue, unmatched binding
- response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(True, response.key_not_matched)
-
- #test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
-
- #test queue not found
- self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
-
-
- def test_binding_query_fanout(self):
- """
- Test that the binding_query method works as expected with fanout exchange
- """
- channel = self.channel
- #setup
- channel.queue_declare(queue="used-queue", exclusive=True)
- channel.queue_declare(queue="unused-queue", exclusive=True)
- channel.queue_bind(exchange="amq.fanout", queue="used-queue")
-
- # test detection of any binding to specific queue
- response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
-
- # test unmatched queue, unspecified binding
- response = channel.binding_query(exchange="amq.fanout", queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
-
- #test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
-
- #test queue not found
- self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
-
- def test_binding_query_header(self):
- """
- Test that the binding_query method works as expected with headers exchanges
- """
- channel = self.channel
- #setup
- channel.queue_declare(queue="used-queue", exclusive=True)
- channel.queue_declare(queue="unused-queue", exclusive=True)
- channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
-
- # test detection of any binding to specific queue
- response = channel.binding_query(exchange="amq.match", queue="used-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
-
- # test detection of specific binding to any queue
- response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.args_not_matched)
-
- # test detection of specific binding to specific queue
- response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(False, response.args_not_matched)
-
- # test unmatched queue, unspecified binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue")
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
-
- # test unspecified queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.args_not_matched)
-
- # test matched queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(False, response.queue_not_matched)
- self.assertEqual(True, response.args_not_matched)
-
- # test unmatched queue, matched binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(False, response.args_not_matched)
-
- # test unmatched queue, unmatched binding
- response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
- self.assertEqual(False, response.exchange_not_found)
- self.assertEqual(False, response.queue_not_found)
- self.assertEqual(True, response.queue_not_matched)
- self.assertEqual(True, response.args_not_matched)
-
- #test exchange not found
- self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
-
- #test queue not found
- self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found)
-
diff --git a/python/tests_0-9/queue.py b/python/tests_0-9/queue.py
deleted file mode 100644
index de1153307c..0000000000
--- a/python/tests_0-9/queue.py
+++ /dev/null
@@ -1,111 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import TestBase
-
-class QueueTests(TestBase):
- """Tests for 'methods' on the amqp queue 'class'"""
-
- def test_unbind_direct(self):
- self.unbind_test(exchange="amq.direct", routing_key="key")
-
- def test_unbind_topic(self):
- self.unbind_test(exchange="amq.topic", routing_key="key")
-
- def test_unbind_fanout(self):
- self.unbind_test(exchange="amq.fanout")
-
- def test_unbind_headers(self):
- self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
-
- def unbind_test(self, exchange, routing_key="", args=None, headers={}):
- #bind two queues and consume from them
- channel = self.channel
-
- channel.queue_declare(queue="queue-1", exclusive="True")
- channel.queue_declare(queue="queue-2", exclusive="True")
-
- channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True)
- channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True)
-
- queue1 = self.client.queue("queue-1")
- queue2 = self.client.queue("queue-2")
-
- channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
- channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
-
- #send a message that will match both bindings
- channel.basic_publish(exchange=exchange, routing_key=routing_key,
- content=Content("one", properties={"headers": headers}))
-
- #unbind first queue
- channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
-
- #send another message
- channel.basic_publish(exchange=exchange, routing_key=routing_key,
- content=Content("two", properties={"headers": headers}))
-
- #check one queue has both messages and the other has only one
- self.assertEquals("one", queue1.get(timeout=1).content.body)
- try:
- msg = queue1.get(timeout=1)
- self.fail("Got extra message: %s" % msg.body)
- except Empty: pass
-
- self.assertEquals("one", queue2.get(timeout=1).content.body)
- self.assertEquals("two", queue2.get(timeout=1).content.body)
- try:
- msg = queue2.get(timeout=1)
- self.fail("Got extra message: " + msg)
- except Empty: pass
-
- def test_autodelete_shared(self):
- """
- Test auto-deletion (of non-exclusive queues)
- """
- channel = self.channel
- other = self.connect()
- channel2 = other.channel(1)
- channel2.channel_open()
-
- channel.queue_declare(queue="auto-delete-me", auto_delete=True)
-
- #consume from both channels
- reply = channel.basic_consume(queue="auto-delete-me", no_ack=True)
- channel2.basic_consume(queue="auto-delete-me", no_ack=True)
-
- #implicit cancel
- channel2.channel_close()
-
- #check it is still there
- channel.queue_declare(queue="auto-delete-me", passive=True)
-
- #explicit cancel => queue is now unused again:
- channel.basic_cancel(consumer_tag=reply.consumer_tag)
-
- #NOTE: this assumes there is no timeout in use
-
- #check that it has gone be declaring passively
- try:
- channel.queue_declare(queue="auto-delete-me", passive=True)
- self.fail("Expected queue to have been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])