summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-09-21 18:26:31 +0000
committerAlan Conway <aconway@apache.org>2006-09-21 18:26:31 +0000
commit474ed3cf1e125360d26dad4376e106e8b48541ac (patch)
tree4f1043da7f03a5ec230539a62afac3fb0f0f0b73 /python
parent82e07bb30905feb2c11bb6d9f3624f976ab070a5 (diff)
downloadqpid-python-474ed3cf1e125360d26dad4376e106e8b48541ac.tar.gz
Implemented topic pattern matching for the TopicExchange.
Corrected default bindings to use the exchange named "" rather than "amqp.direct". Added python and unit tests for all of the above. Minor improvements to testlib.py, also some tests for testlib itself. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@448624 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/testlib.py45
-rw-r--r--python/tests/exchange.py44
-rw-r--r--python/tests/queue.py8
3 files changed, 67 insertions, 30 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index ff9ecbee8a..0bec6a8708 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -14,11 +14,13 @@
# limitations under the License.
#
+#
# Support library for qpid python tests.
#
import sys, re, unittest, os, random, logging
import qpid.client, qpid.spec
+import Queue
from getopt import getopt, GetoptError
@@ -188,26 +190,41 @@ class TestBase(unittest.TestCase):
self.exchanges.append((channel,exchange))
return reply
- def assertPublishConsume(self, queue="", exchange="", routing_key=""):
- """
- Publish a message and consume it, assert it comes back intact.
+ def uniqueString(self):
+ """Generate a unique string, unique for this TestBase instance"""
+ # TODO aconway 2006-09-20: Not thread safe.
+ if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+ return "Test Message " + str(self.uniqueCounter)
+
+ def consume(self, queueName):
+ """Consume from named queue returns the Queue object."""
+ reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+ return self.client.queue(reply.consumer_tag)
+
+ def assertEmpty(self, queue):
+ """Assert that the queue is empty"""
+ try:
+ queue.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Queue.Empty: None # Ignore
- queue can be a single queue name or a list of queue names.
- For a list assert the message appears on all queues.
- Crude attempt to make unique messages so we can't consume
- a message not really meant for us.
+ def assertPublishGet(self, queue, exchange="", routing_key=""):
"""
- body = "TestMessage("+str(random.randint(999999, 1000000))+")"
+ Publish to exchange and assert queue.get() returns the same message.
+ """
+ body = self.uniqueString()
self.channel.basic_publish(exchange=exchange,
content=qpid.content.Content(body),
routing_key=routing_key)
- if not isinstance(queue, list): queue = [queue]
- for q in queue:
- reply = self.channel.basic_consume(queue=q, no_ack=True)
- msg = self.client.queue(reply.consumer_tag).get(timeout=2)
- self.assertEqual(body, msg.content.body)
-
+ self.assertEqual(body, queue.get(timeout=2).content.body)
+ def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+ """
+ Publish a message and consume it, assert it comes back intact.
+ Return the Queue object used to consume.
+ """
+ self.assertPublishGet(self.consume(queue), exchange, routing_key)
+
def assertChannelException(self, expectedCode, message):
self.assertEqual(message.method.klass.name, "channel")
self.assertEqual(message.method.name, "close")
diff --git a/python/tests/exchange.py b/python/tests/exchange.py
index b9b16bad78..4eb64520e6 100644
--- a/python/tests/exchange.py
+++ b/python/tests/exchange.py
@@ -57,9 +57,25 @@ class StandardExchangeVerifier:
self.channel.queue_bind(queue="q", exchange=ex)
self.queue_declare(queue="p")
self.channel.queue_bind(queue="p", exchange=ex)
- self.assertPublishConsume(exchange=ex, queue=["q","p"])
+ for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+ def verifyTopicExchange(self, ex):
+ """Verify that ex behaves like a topic exchange"""
+ self.queue_declare(queue="a")
+ self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+ q = self.consume("a")
+ self.assertPublishGet(q, ex, "a.b.x")
+ self.assertPublishGet(q, ex, "a.x.b.x")
+ self.assertPublishGet(q, ex, "a.x.x.b.x")
+
+ # Shouldn't match
+ self.channel.basic_publish(exchange=ex, routing_key="a.b")
+ self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")
+ self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x")
+ self.channel.basic_publish(exchange=ex, routing_key="a.b")
+ self.assert_(q.empty())
+
-
class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""
The server SHOULD implement these standard exchange types: topic, headers.
@@ -76,6 +92,11 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""Declare and test a fanout exchange"""
self.exchange_declare(0, exchange="f", type="fanout")
self.verifyFanOutExchange("f")
+
+ def testTopic(self):
+ """Declare and test a topic exchange"""
+ self.exchange_declare(0, exchange="t", type="topic")
+ self.verifyTopicExchange("t")
class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
@@ -88,24 +109,17 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if
those types are defined).
"""
- # TODO aconway 2006-09-01: Add tests for 3.1.3.1:
- # - Test auto binding by q name
- # - Test the nameless "default publish" exchange.
- # - Auto created amq.fanout exchange
-
def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
- def testAmqTopic(self):
- self.exchange_declare(0, exchange="amq.topic", passive="true")
- # TODO aconway 2006-09-14: verify topic behavior
+ def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
def testAmqHeaders(self):
self.exchange_declare(0, exchange="amq.headers", passive="true")
# TODO aconway 2006-09-14: verify headers behavior
-class DefaultExchangeRuleTests(TestBase):
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
"""
The server MUST predeclare a direct exchange to act as the default exchange
for content Publish methods and for default queue bindings.
@@ -115,6 +129,12 @@ class DefaultExchangeRuleTests(TestBase):
routing key but without specifying the exchange name, then ensuring that
the message arrives in the queue correctly.
"""
+ def testDefaultExchange(self):
+ # Test automatic binding by queue name.
+ self.queue_declare(queue="d")
+ self.assertPublishConsume(queue="d", routing_key="d")
+ # Test explicit bind to default queue
+ self.verifyDirectExchange("")
class DefaultAccessRuleTests(TestBase):
@@ -123,7 +143,7 @@ class DefaultAccessRuleTests(TestBase):
by specifying an empty exchange name in the Queue.Bind and content Publish
methods.
"""
-
+ # TODO aconway 2006-09-18: fill this in.
class ExtensionsRuleTests(TestBase):
"""
diff --git a/python/tests/queue.py b/python/tests/queue.py
index 92260a7d64..65baaaa00b 100644
--- a/python/tests/queue.py
+++ b/python/tests/queue.py
@@ -156,9 +156,9 @@ class QueueTests(TestBase):
#straight-forward case:
channel.queue_declare(queue="delete-me")
- channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("a"))
- channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("b"))
- channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("c"))
+ channel.basic_publish(routing_key="delete-me", content=Content("a"))
+ channel.basic_publish(routing_key="delete-me", content=Content("b"))
+ channel.basic_publish(routing_key="delete-me", content=Content("c"))
reply = channel.queue_delete(queue="delete-me")
self.assertEqual(3, reply.message_count)
#check that it has gone be declaring passively
@@ -189,7 +189,7 @@ class QueueTests(TestBase):
#create a queue and add a message to it (use default binding):
channel.queue_declare(queue="delete-me-2")
channel.queue_declare(queue="delete-me-2", passive="True")
- channel.basic_publish(exchange="amq.direct", routing_key="delete-me-2", content=Content("message"))
+ channel.basic_publish(routing_key="delete-me-2", content=Content("message"))
#try to delete, but only if empty:
try: