diff options
author | Alan Conway <aconway@apache.org> | 2006-09-27 19:50:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-09-27 19:50:23 +0000 |
commit | 3d9cd9a1f350c8970c6cd0da20d918b831342636 (patch) | |
tree | 18178234a4807121b06f35a78c23dc2a33076da5 /qpid/python | |
parent | cb3fe168a5c4c0c91b5d32ff28b176d57c8eb870 (diff) | |
download | qpid-python-3d9cd9a1f350c8970c6cd0da20d918b831342636.tar.gz |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@450556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r-- | qpid/python/qpid/connection.py | 8 | ||||
-rw-r--r-- | qpid/python/qpid/peer.py | 3 | ||||
-rw-r--r-- | qpid/python/qpid/testlib.py | 20 | ||||
-rw-r--r-- | qpid/python/tests/exchange.py | 76 | ||||
-rw-r--r-- | qpid/python/tests/testlib.py | 9 |
5 files changed, 81 insertions, 35 deletions
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index f4d0817e60..fc6c147f2b 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -20,7 +20,7 @@ to read and write Frame objects. This could be used by a client, server, or even a proxy implementation. """ -import socket, codec +import socket, codec,logging from cStringIO import StringIO from spec import load, pythonize from codec import EOF @@ -240,8 +240,10 @@ class Header(Payload): properties = {} for b, f in zip(bits, klass.fields): if b: - properties[f.name] = c.decode(f.type) - + # Note: decode returns a unicode u'' string but only + # plain '' strings can be used as keywords so we need to + # stringify the names. + properties[str(f.name)] = c.decode(f.type) return Header(klass, weight, size, **properties) def __str__(self): diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index 31d3d24f5f..3085e24247 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -146,7 +146,6 @@ class Channel: def invoke(self, method, args, content = None): if self.closed: raise Closed(self.reason) - frame = Frame(self.id, Method(method, *args)) self.outgoing.put(frame) @@ -181,7 +180,7 @@ class Channel: def write_content(self, klass, content, queue): size = content.size() - header = Frame(self.id, Header(klass, content.weight(), size)) + header = Frame(self.id, Header(klass, content.weight(), size, **content.properties)) queue.put(header) for child in content.children: self.write_content(klass, child, queue) diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 0bec6a8708..92925bea20 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -22,7 +22,7 @@ import sys, re, unittest, os, random, logging import qpid.client, qpid.spec import Queue from getopt import getopt, GetoptError - +from qpid.content import Content def findmodules(root): """Find potential python modules under directory root""" @@ -161,10 +161,6 @@ class TestBase(unittest.TestCase): self.channel.channel_open() def tearDown(self): - # TODO aconway 2006-09-05: Wrong behaviour here, we should - # close all open channels (checking for exceptions on the - # channesl) then open a channel to clean up qs and exs, - # finally close that channel. for ch, q in self.queues: ch.queue_delete(queue=q) for ch, ex in self.exchanges: @@ -186,13 +182,11 @@ class TestBase(unittest.TestCase): arguments={}): channel = channel or self.channel reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments) - # TODO aconway 2006-09-14: Don't add exchange on failure. self.exchanges.append((channel,exchange)) return reply def uniqueString(self): """Generate a unique string, unique for this TestBase instance""" - # TODO aconway 2006-09-20: Not thread safe. if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; return "Test Message " + str(self.uniqueCounter) @@ -208,22 +202,24 @@ class TestBase(unittest.TestCase): self.fail("Queue is not empty.") except Queue.Empty: None # Ignore - def assertPublishGet(self, queue, exchange="", routing_key=""): + def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): """ Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() self.channel.basic_publish(exchange=exchange, - content=qpid.content.Content(body), + content=Content(body, properties=properties), routing_key=routing_key) - self.assertEqual(body, queue.get(timeout=2).content.body) + msg = queue.get(timeout=1) + self.assertEqual(body, msg.content.body) + if (properties): self.assertEqual(properties, msg.content.properties) - def assertPublishConsume(self, queue="", exchange="", routing_key=""): + def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ Publish a message and consume it, assert it comes back intact. Return the Queue object used to consume. """ - self.assertPublishGet(self.consume(queue), exchange, routing_key) + self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) def assertChannelException(self, expectedCode, message): self.assertEqual(message.method.klass.name, "channel") diff --git a/qpid/python/tests/exchange.py b/qpid/python/tests/exchange.py index 4eb64520e6..8f3504b15e 100644 --- a/qpid/python/tests/exchange.py +++ b/qpid/python/tests/exchange.py @@ -20,22 +20,11 @@ Tests for exchange behaviour. Test classes ending in 'RuleTests' are derived from rules in amqp.xml. """ -import logging, Queue +import Queue, logging from qpid.testlib import TestBase from qpid.content import Content -# TODO aconway 2006-09-01: Investigate and add tests as appropriate. -# Observered on C++: -# -# No exception raised for basic_consume on non-existent queue name. -# No exception for basic_publish with bad routing key. -# No exception for binding to non-existent exchange? -# queue_bind hangs with invalid exchange name -# -# Do server exceptions get propagated properly? -# Do Java exceptions propagate with any data (or just Closed()) - class StandardExchangeVerifier: """Verifies standard exchange behavior. @@ -67,7 +56,6 @@ class StandardExchangeVerifier: self.assertPublishGet(q, ex, "a.b.x") self.assertPublishGet(q, ex, "a.x.b.x") self.assertPublishGet(q, ex, "a.x.x.b.x") - # Shouldn't match self.channel.basic_publish(exchange=ex, routing_key="a.b") self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") @@ -75,6 +63,16 @@ class StandardExchangeVerifier: self.channel.basic_publish(exchange=ex, routing_key="a.b") self.assert_(q.empty()) + def verifyHeadersExchange(self, ex): + """Verify that ex is a headers exchange""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) + q = self.consume("q") + headers = {"name":"fred", "age":3} + self.assertPublishGet(q, exchange=ex, properties={'headers':headers}) + self.channel.basic_publish(exchange=ex) # No headers, won't deliver + self.assertEmpty(q); + class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): """ @@ -97,6 +95,11 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): """Declare and test a topic exchange""" self.exchange_declare(0, exchange="t", type="topic") self.verifyTopicExchange("t") + + def testHeaders(self): + """Declare and test a headers exchange""" + self.exchange_declare(0, exchange="h", type="headers") + self.verifyHeadersExchange("h") class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): @@ -106,7 +109,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): exchange instance is amq. followed by the exchange type name. Client creates a temporary queue and attempts to bind to each required - exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if + exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if those types are defined). """ def testAmqDirect(self): self.verifyDirectExchange("amq.direct") @@ -115,9 +118,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): def testAmqTopic(self): self.verifyTopicExchange("amq.topic") - def testAmqHeaders(self): - self.exchange_declare(0, exchange="amq.headers", passive="true") - # TODO aconway 2006-09-14: verify headers behavior + def testAmqMatch(self): self.verifyHeadersExchange("amq.match") class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): """ @@ -137,13 +138,14 @@ class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): self.verifyDirectExchange("") +# TODO aconway 2006-09-27: Fill in empty tests: + class DefaultAccessRuleTests(TestBase): """ The server MUST NOT allow clients to access the default exchange except by specifying an empty exchange name in the Queue.Bind and content Publish methods. """ - # TODO aconway 2006-09-18: fill this in. class ExtensionsRuleTests(TestBase): """ @@ -252,3 +254,41 @@ class DeleteMethodExchangeFieldExistsRuleTests(TestBase): """ +class HeadersExchangeTests(TestBase): + """ + Tests for headers exchange functionality. + """ + def setUp(self): + TestBase.setUp(self) + self.queue_declare(queue="q") + self.q = self.consume("q") + + def myAssertPublishGet(self, headers): + self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers}) + + def myBasicPublish(self, headers): + self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers})) + + def testMatchAll(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) + + # None of these should match + self.myBasicPublish({}) + self.myBasicPublish({"name":"barney"}) + self.myBasicPublish({"name":10}) + self.myBasicPublish({"name":"fred", "age":2}) + self.assertEmpty(self.q) + + def testMatchAny(self): + self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) + self.myAssertPublishGet({"name":"fred"}) + self.myAssertPublishGet({"name":"fred", "ignoreme":10}) + self.myAssertPublishGet({"ignoreme":10, "age":3}) + + # Wont match + self.myBasicPublish({}) + self.myBasicPublish({"irrelevant":0}) + self.assertEmpty(self.q) + diff --git a/qpid/python/tests/testlib.py b/qpid/python/tests/testlib.py index a50f8140b4..6a2efb6a11 100644 --- a/qpid/python/tests/testlib.py +++ b/qpid/python/tests/testlib.py @@ -52,3 +52,12 @@ class TestBaseTest(TestBase): self.fail("assertEmpty did not assert on non-empty queue") except AssertionError: None # Ignore + def testMessageProperties(self): + """Verify properties are passed with message""" + props={"headers":{"x":1, "y":2}} + self.queue_declare(queue="q") + q = self.consume("q") + self.assertPublishGet(q, routing_key="q", properties=props) + + + |