summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-09-27 19:50:23 +0000
committerAlan Conway <aconway@apache.org>2006-09-27 19:50:23 +0000
commit3d9cd9a1f350c8970c6cd0da20d918b831342636 (patch)
tree18178234a4807121b06f35a78c23dc2a33076da5 /qpid/python
parentcb3fe168a5c4c0c91b5d32ff28b176d57c8eb870 (diff)
downloadqpid-python-3d9cd9a1f350c8970c6cd0da20d918b831342636.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@450556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/connection.py8
-rw-r--r--qpid/python/qpid/peer.py3
-rw-r--r--qpid/python/qpid/testlib.py20
-rw-r--r--qpid/python/tests/exchange.py76
-rw-r--r--qpid/python/tests/testlib.py9
5 files changed, 81 insertions, 35 deletions
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py
index f4d0817e60..fc6c147f2b 100644
--- a/qpid/python/qpid/connection.py
+++ b/qpid/python/qpid/connection.py
@@ -20,7 +20,7 @@ to read and write Frame objects. This could be used by a client,
server, or even a proxy implementation.
"""
-import socket, codec
+import socket, codec,logging
from cStringIO import StringIO
from spec import load, pythonize
from codec import EOF
@@ -240,8 +240,10 @@ class Header(Payload):
properties = {}
for b, f in zip(bits, klass.fields):
if b:
- properties[f.name] = c.decode(f.type)
-
+ # Note: decode returns a unicode u'' string but only
+ # plain '' strings can be used as keywords so we need to
+ # stringify the names.
+ properties[str(f.name)] = c.decode(f.type)
return Header(klass, weight, size, **properties)
def __str__(self):
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 31d3d24f5f..3085e24247 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -146,7 +146,6 @@ class Channel:
def invoke(self, method, args, content = None):
if self.closed:
raise Closed(self.reason)
-
frame = Frame(self.id, Method(method, *args))
self.outgoing.put(frame)
@@ -181,7 +180,7 @@ class Channel:
def write_content(self, klass, content, queue):
size = content.size()
- header = Frame(self.id, Header(klass, content.weight(), size))
+ header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
queue.put(header)
for child in content.children:
self.write_content(klass, child, queue)
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index 0bec6a8708..92925bea20 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -22,7 +22,7 @@ import sys, re, unittest, os, random, logging
import qpid.client, qpid.spec
import Queue
from getopt import getopt, GetoptError
-
+from qpid.content import Content
def findmodules(root):
"""Find potential python modules under directory root"""
@@ -161,10 +161,6 @@ class TestBase(unittest.TestCase):
self.channel.channel_open()
def tearDown(self):
- # TODO aconway 2006-09-05: Wrong behaviour here, we should
- # close all open channels (checking for exceptions on the
- # channesl) then open a channel to clean up qs and exs,
- # finally close that channel.
for ch, q in self.queues:
ch.queue_delete(queue=q)
for ch, ex in self.exchanges:
@@ -186,13 +182,11 @@ class TestBase(unittest.TestCase):
arguments={}):
channel = channel or self.channel
reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
- # TODO aconway 2006-09-14: Don't add exchange on failure.
self.exchanges.append((channel,exchange))
return reply
def uniqueString(self):
"""Generate a unique string, unique for this TestBase instance"""
- # TODO aconway 2006-09-20: Not thread safe.
if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
return "Test Message " + str(self.uniqueCounter)
@@ -208,22 +202,24 @@ class TestBase(unittest.TestCase):
self.fail("Queue is not empty.")
except Queue.Empty: None # Ignore
- def assertPublishGet(self, queue, exchange="", routing_key=""):
+ def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
"""
Publish to exchange and assert queue.get() returns the same message.
"""
body = self.uniqueString()
self.channel.basic_publish(exchange=exchange,
- content=qpid.content.Content(body),
+ content=Content(body, properties=properties),
routing_key=routing_key)
- self.assertEqual(body, queue.get(timeout=2).content.body)
+ msg = queue.get(timeout=1)
+ self.assertEqual(body, msg.content.body)
+ if (properties): self.assertEqual(properties, msg.content.properties)
- def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+ def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
Publish a message and consume it, assert it comes back intact.
Return the Queue object used to consume.
"""
- self.assertPublishGet(self.consume(queue), exchange, routing_key)
+ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
def assertChannelException(self, expectedCode, message):
self.assertEqual(message.method.klass.name, "channel")
diff --git a/qpid/python/tests/exchange.py b/qpid/python/tests/exchange.py
index 4eb64520e6..8f3504b15e 100644
--- a/qpid/python/tests/exchange.py
+++ b/qpid/python/tests/exchange.py
@@ -20,22 +20,11 @@ Tests for exchange behaviour.
Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
"""
-import logging, Queue
+import Queue, logging
from qpid.testlib import TestBase
from qpid.content import Content
-# TODO aconway 2006-09-01: Investigate and add tests as appropriate.
-# Observered on C++:
-#
-# No exception raised for basic_consume on non-existent queue name.
-# No exception for basic_publish with bad routing key.
-# No exception for binding to non-existent exchange?
-# queue_bind hangs with invalid exchange name
-#
-# Do server exceptions get propagated properly?
-# Do Java exceptions propagate with any data (or just Closed())
-
class StandardExchangeVerifier:
"""Verifies standard exchange behavior.
@@ -67,7 +56,6 @@ class StandardExchangeVerifier:
self.assertPublishGet(q, ex, "a.b.x")
self.assertPublishGet(q, ex, "a.x.b.x")
self.assertPublishGet(q, ex, "a.x.x.b.x")
-
# Shouldn't match
self.channel.basic_publish(exchange=ex, routing_key="a.b")
self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")
@@ -75,6 +63,16 @@ class StandardExchangeVerifier:
self.channel.basic_publish(exchange=ex, routing_key="a.b")
self.assert_(q.empty())
+ def verifyHeadersExchange(self, ex):
+ """Verify that ex is a headers exchange"""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+ q = self.consume("q")
+ headers = {"name":"fred", "age":3}
+ self.assertPublishGet(q, exchange=ex, properties={'headers':headers})
+ self.channel.basic_publish(exchange=ex) # No headers, won't deliver
+ self.assertEmpty(q);
+
class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""
@@ -97,6 +95,11 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""Declare and test a topic exchange"""
self.exchange_declare(0, exchange="t", type="topic")
self.verifyTopicExchange("t")
+
+ def testHeaders(self):
+ """Declare and test a headers exchange"""
+ self.exchange_declare(0, exchange="h", type="headers")
+ self.verifyHeadersExchange("h")
class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
@@ -106,7 +109,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
exchange instance is amq. followed by the exchange type name.
Client creates a temporary queue and attempts to bind to each required
- exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if
+ exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
those types are defined).
"""
def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
@@ -115,9 +118,7 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
- def testAmqHeaders(self):
- self.exchange_declare(0, exchange="amq.headers", passive="true")
- # TODO aconway 2006-09-14: verify headers behavior
+ def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
"""
@@ -137,13 +138,14 @@ class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
self.verifyDirectExchange("")
+# TODO aconway 2006-09-27: Fill in empty tests:
+
class DefaultAccessRuleTests(TestBase):
"""
The server MUST NOT allow clients to access the default exchange except
by specifying an empty exchange name in the Queue.Bind and content Publish
methods.
"""
- # TODO aconway 2006-09-18: fill this in.
class ExtensionsRuleTests(TestBase):
"""
@@ -252,3 +254,41 @@ class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
"""
+class HeadersExchangeTests(TestBase):
+ """
+ Tests for headers exchange functionality.
+ """
+ def setUp(self):
+ TestBase.setUp(self)
+ self.queue_declare(queue="q")
+ self.q = self.consume("q")
+
+ def myAssertPublishGet(self, headers):
+ self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers})
+
+ def myBasicPublish(self, headers):
+ self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers}))
+
+ def testMatchAll(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+
+ # None of these should match
+ self.myBasicPublish({})
+ self.myBasicPublish({"name":"barney"})
+ self.myBasicPublish({"name":10})
+ self.myBasicPublish({"name":"fred", "age":2})
+ self.assertEmpty(self.q)
+
+ def testMatchAny(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred"})
+ self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+ self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+ # Wont match
+ self.myBasicPublish({})
+ self.myBasicPublish({"irrelevant":0})
+ self.assertEmpty(self.q)
+
diff --git a/qpid/python/tests/testlib.py b/qpid/python/tests/testlib.py
index a50f8140b4..6a2efb6a11 100644
--- a/qpid/python/tests/testlib.py
+++ b/qpid/python/tests/testlib.py
@@ -52,3 +52,12 @@ class TestBaseTest(TestBase):
self.fail("assertEmpty did not assert on non-empty queue")
except AssertionError: None # Ignore
+ def testMessageProperties(self):
+ """Verify properties are passed with message"""
+ props={"headers":{"x":1, "y":2}}
+ self.queue_declare(queue="q")
+ q = self.consume("q")
+ self.assertPublishGet(q, routing_key="q", properties=props)
+
+
+