summaryrefslogtreecommitdiff
path: root/python/qpid/testlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/testlib.py')
-rw-r--r--python/qpid/testlib.py45
1 files changed, 31 insertions, 14 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index ff9ecbee8a..0bec6a8708 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -14,11 +14,13 @@
# limitations under the License.
#
+#
# Support library for qpid python tests.
#
import sys, re, unittest, os, random, logging
import qpid.client, qpid.spec
+import Queue
from getopt import getopt, GetoptError
@@ -188,26 +190,41 @@ class TestBase(unittest.TestCase):
self.exchanges.append((channel,exchange))
return reply
- def assertPublishConsume(self, queue="", exchange="", routing_key=""):
- """
- Publish a message and consume it, assert it comes back intact.
+ def uniqueString(self):
+ """Generate a unique string, unique for this TestBase instance"""
+ # TODO aconway 2006-09-20: Not thread safe.
+ if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+ return "Test Message " + str(self.uniqueCounter)
+
+ def consume(self, queueName):
+ """Consume from named queue returns the Queue object."""
+ reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+ return self.client.queue(reply.consumer_tag)
+
+ def assertEmpty(self, queue):
+ """Assert that the queue is empty"""
+ try:
+ queue.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Queue.Empty: None # Ignore
- queue can be a single queue name or a list of queue names.
- For a list assert the message appears on all queues.
- Crude attempt to make unique messages so we can't consume
- a message not really meant for us.
+ def assertPublishGet(self, queue, exchange="", routing_key=""):
"""
- body = "TestMessage("+str(random.randint(999999, 1000000))+")"
+ Publish to exchange and assert queue.get() returns the same message.
+ """
+ body = self.uniqueString()
self.channel.basic_publish(exchange=exchange,
content=qpid.content.Content(body),
routing_key=routing_key)
- if not isinstance(queue, list): queue = [queue]
- for q in queue:
- reply = self.channel.basic_consume(queue=q, no_ack=True)
- msg = self.client.queue(reply.consumer_tag).get(timeout=2)
- self.assertEqual(body, msg.content.body)
-
+ self.assertEqual(body, queue.get(timeout=2).content.body)
+ def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+ """
+ Publish a message and consume it, assert it comes back intact.
+ Return the Queue object used to consume.
+ """
+ self.assertPublishGet(self.consume(queue), exchange, routing_key)
+
def assertChannelException(self, expectedCode, message):
self.assertEqual(message.method.klass.name, "channel")
self.assertEqual(message.method.name, "close")