summaryrefslogtreecommitdiff
path: root/trunk/qpid/python/tests_0-10/exchange.py
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/python/tests_0-10/exchange.py')
-rw-r--r--trunk/qpid/python/tests_0-10/exchange.py416
1 files changed, 0 insertions, 416 deletions
diff --git a/trunk/qpid/python/tests_0-10/exchange.py b/trunk/qpid/python/tests_0-10/exchange.py
deleted file mode 100644
index 4b5dc78143..0000000000
--- a/trunk/qpid/python/tests_0-10/exchange.py
+++ /dev/null
@@ -1,416 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Tests for exchange behaviour.
-
-Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
-"""
-
-import Queue, logging, traceback
-from qpid.testlib import TestBase010
-from qpid.datatypes import Message
-from qpid.client import Closed
-from qpid.session import SessionException
-
-
-class TestHelper(TestBase010):
- def setUp(self):
- TestBase010.setUp(self)
- self.queues = []
- self.exchanges = []
-
- def tearDown(self):
- try:
- for ssn, q in self.queues:
- ssn.queue_delete(queue=q)
- for ssn, ex in self.exchanges:
- ssn.exchange_delete(exchange=ex)
- except:
- print "Error on tearDown:"
- print traceback.print_exc()
- TestBase010.tearDown(self)
-
- def createMessage(self, key="", body=""):
- return Message(self.session.delivery_properties(routing_key=key), body)
-
- def getApplicationHeaders(self, msg):
- for h in msg.headers:
- if hasattr(h, 'application_headers'): return getattr(h, 'application_headers')
- return None
-
- def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
- """
- Publish to exchange and assert queue.get() returns the same message.
- """
- body = self.uniqueString()
- dp=self.session.delivery_properties(routing_key=routing_key)
- mp=self.session.message_properties(application_headers=properties)
- self.session.message_transfer(destination=exchange, message=Message(dp, mp, body))
- msg = queue.get(timeout=1)
- self.assertEqual(body, msg.body)
- if (properties):
- self.assertEqual(properties, self.getApplicationHeaders(msg))
-
- def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
- """
- Publish a message and consume it, assert it comes back intact.
- Return the Queue object used to consume.
- """
- self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
-
- def assertEmpty(self, queue):
- """Assert that the queue is empty"""
- try:
- queue.get(timeout=1)
- self.fail("Queue is not empty.")
- except Queue.Empty: None # Ignore
-
- def queue_declare(self, session=None, *args, **keys):
- session = session or self.session
- reply = session.queue_declare(*args, **keys)
- self.queues.append((session, keys["queue"]))
- return reply
-
- def exchange_declare(self, session=None, ticket=0, exchange='',
- type='', passive=False, durable=False,
- auto_delete=False,
- arguments={}):
- session = session or self.session
- reply = session.exchange_declare(exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments)
- self.exchanges.append((session,exchange))
- return reply
-
- def uniqueString(self):
- """Generate a unique string, unique for this TestBase instance"""
- if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
- return "Test Message " + str(self.uniqueCounter)
-
- def consume(self, queueName):
- """Consume from named queue returns the Queue object."""
- if not "uniqueTag" in dir(self): self.uniqueTag = 1
- else: self.uniqueTag += 1
- consumer_tag = "tag" + str(self.uniqueTag)
- self.session.message_subscribe(queue=queueName, destination=consumer_tag)
- self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
- self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
- return self.session.incoming(consumer_tag)
-
-
-class StandardExchangeVerifier:
- """Verifies standard exchange behavior.
-
- Used as base class for classes that test standard exchanges."""
-
- def verifyDirectExchange(self, ex):
- """Verify that ex behaves like a direct exchange."""
- self.queue_declare(queue="q")
- self.session.exchange_bind(queue="q", exchange=ex, binding_key="k")
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
- try:
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
- self.fail("Expected Empty exception")
- except Queue.Empty: None # Expected
-
- def verifyFanOutExchange(self, ex):
- """Verify that ex behaves like a fanout exchange."""
- self.queue_declare(queue="q")
- self.session.exchange_bind(queue="q", exchange=ex)
- self.queue_declare(queue="p")
- self.session.exchange_bind(queue="p", exchange=ex)
- for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
-
- def verifyTopicExchange(self, ex):
- """Verify that ex behaves like a topic exchange"""
- self.queue_declare(queue="a")
- self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*")
- q = self.consume("a")
- self.assertPublishGet(q, ex, "a.b.x")
- self.assertPublishGet(q, ex, "a.x.b.x")
- self.assertPublishGet(q, ex, "a.x.x.b.x")
- # Shouldn't match
- self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
- self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y"))
- self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x"))
- self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
- self.assert_(q.empty())
-
- def verifyHeadersExchange(self, ex):
- """Verify that ex is a headers exchange"""
- self.queue_declare(queue="q")
- self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
- q = self.consume("q")
- headers = {"name":"fred", "age":3}
- self.assertPublishGet(q, exchange=ex, properties=headers)
- self.session.message_transfer(destination=ex) # No headers, won't deliver
- self.assertEmpty(q);
-
-
-class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier):
- """
- The server SHOULD implement these standard exchange types: topic, headers.
-
- Client attempts to declare an exchange with each of these standard types.
- """
-
- def testDirect(self):
- """Declare and test a direct exchange"""
- self.exchange_declare(0, exchange="d", type="direct")
- self.verifyDirectExchange("d")
-
- def testFanout(self):
- """Declare and test a fanout exchange"""
- self.exchange_declare(0, exchange="f", type="fanout")
- self.verifyFanOutExchange("f")
-
- def testTopic(self):
- """Declare and test a topic exchange"""
- self.exchange_declare(0, exchange="t", type="topic")
- self.verifyTopicExchange("t")
-
- def testHeaders(self):
- """Declare and test a headers exchange"""
- self.exchange_declare(0, exchange="h", type="headers")
- self.verifyHeadersExchange("h")
-
-
-class RequiredInstancesRuleTests(TestHelper, StandardExchangeVerifier):
- """
- The server MUST, in each virtual host, pre-declare an exchange instance
- for each standard exchange type that it implements, where the name of the
- exchange instance is amq. followed by the exchange type name.
-
- Client creates a temporary queue and attempts to bind to each required
- exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
- those types are defined).
- """
- def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
-
- def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
-
- def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
-
- def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
-
-class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier):
- """
- The server MUST predeclare a direct exchange to act as the default exchange
- for content Publish methods and for default queue bindings.
-
- Client checks that the default exchange is active by specifying a queue
- binding with no exchange name, and publishing a message with a suitable
- routing key but without specifying the exchange name, then ensuring that
- the message arrives in the queue correctly.
- """
- def testDefaultExchange(self):
- # Test automatic binding by queue name.
- self.queue_declare(queue="d")
- self.assertPublishConsume(queue="d", routing_key="d")
- # Test explicit bind to default queue
- self.verifyDirectExchange("")
-
-
-# TODO aconway 2006-09-27: Fill in empty tests:
-
-class DefaultAccessRuleTests(TestHelper):
- """
- The server MUST NOT allow clients to access the default exchange except
- by specifying an empty exchange name in the Queue.Bind and content Publish
- methods.
- """
-
-class ExtensionsRuleTests(TestHelper):
- """
- The server MAY implement other exchange types as wanted.
- """
-
-
-class DeclareMethodMinimumRuleTests(TestHelper):
- """
- The server SHOULD support a minimum of 16 exchanges per virtual host and
- ideally, impose no limit except as defined by available resources.
-
- The client creates as many exchanges as it can until the server reports
- an error; the number of exchanges successfuly created must be at least
- sixteen.
- """
-
-
-class DeclareMethodTicketFieldValidityRuleTests(TestHelper):
- """
- The client MUST provide a valid access ticket giving "active" access to
- the realm in which the exchange exists or will be created, or "passive"
- access if the if-exists flag is set.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeclareMethodExchangeFieldReservedRuleTests(TestHelper):
- """
- Exchange names starting with "amq." are reserved for predeclared and
- standardised exchanges. The client MUST NOT attempt to create an exchange
- starting with "amq.".
-
-
- """
-
-
-class DeclareMethodTypeFieldTypedRuleTests(TestHelper):
- """
- Exchanges cannot be redeclared with different types. The client MUST not
- attempt to redeclare an existing exchange with a different type than used
- in the original Exchange.Declare method.
-
-
- """
-
-
-class DeclareMethodTypeFieldSupportRuleTests(TestHelper):
- """
- The client MUST NOT attempt to create an exchange with a type that the
- server does not support.
-
-
- """
-
-
-class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper):
- """
- If set, and the exchange does not already exist, the server MUST raise a
- channel exception with reply code 404 (not found).
- """
- def test(self):
- try:
- self.session.exchange_declare(exchange="humpty_dumpty", passive=True)
- self.fail("Expected 404 for passive declaration of unknown exchange.")
- except SessionException, e:
- self.assertEquals(404, e.args[0].error_code)
-
-
-class DeclareMethodDurableFieldSupportRuleTests(TestHelper):
- """
- The server MUST support both durable and transient exchanges.
-
-
- """
-
-
-class DeclareMethodDurableFieldStickyRuleTests(TestHelper):
- """
- The server MUST ignore the durable field if the exchange already exists.
-
-
- """
-
-
-class DeclareMethodAutoDeleteFieldStickyRuleTests(TestHelper):
- """
- The server MUST ignore the auto-delete field if the exchange already
- exists.
-
-
- """
-
-
-class DeleteMethodTicketFieldValidityRuleTests(TestHelper):
- """
- The client MUST provide a valid access ticket giving "active" access
- rights to the exchange's access realm.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeleteMethodExchangeFieldExistsRuleTests(TestHelper):
- """
- The client MUST NOT attempt to delete an exchange that does not exist.
- """
-
-
-class HeadersExchangeTests(TestHelper):
- """
- Tests for headers exchange functionality.
- """
- def setUp(self):
- TestHelper.setUp(self)
- self.queue_declare(queue="q")
- self.q = self.consume("q")
-
- def myAssertPublishGet(self, headers):
- self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
-
- def myBasicPublish(self, headers):
- mp=self.session.message_properties(application_headers=headers)
- self.session.message_transfer(destination="amq.match", message=Message(mp, "foobar"))
-
- def testMatchAll(self):
- self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
-
- # None of these should match
- self.myBasicPublish({})
- self.myBasicPublish({"name":"barney"})
- self.myBasicPublish({"name":10})
- self.myBasicPublish({"name":"fred", "age":2})
- self.assertEmpty(self.q)
-
- def testMatchAny(self):
- self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred"})
- self.myAssertPublishGet({"name":"fred", "ignoreme":10})
- self.myAssertPublishGet({"ignoreme":10, "age":3})
-
- # Wont match
- self.myBasicPublish({})
- self.myBasicPublish({"irrelevant":0})
- self.assertEmpty(self.q)
-
-
-class MiscellaneousErrorsTests(TestHelper):
- """
- Test some miscellaneous error conditions
- """
- def testTypeNotKnown(self):
- try:
- self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
- self.fail("Expected 503 for declaration of unknown exchange type.")
- except SessionException, e:
- self.assertEquals(503, e.args[0].error_code)
-
- def testDifferentDeclaredType(self):
- self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
- try:
- session = self.conn.session("alternate", 2)
- session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
- self.fail("Expected 530 for redeclaration of exchange with different type.")
- except SessionException, e:
- self.assertEquals(530, e.args[0].error_code)
-
-class ExchangeTests(TestHelper):
- def testHeadersBindNoMatchArg(self):
- self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)
- try:
- self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
- self.fail("Expected failure for missing x-match arg.")
- except SessionException, e:
- self.assertEquals(541, e.args[0].error_code)