summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/testlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/testlib.py')
-rw-r--r--qpid/python/qpid/testlib.py241
1 files changed, 241 insertions, 0 deletions
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
new file mode 100644
index 0000000000..256aa7b5e6
--- /dev/null
+++ b/qpid/python/qpid/testlib.py
@@ -0,0 +1,241 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Support library for qpid python tests.
+#
+
+import string
+import random
+
+import unittest, traceback, socket
+import qpid.client, qmf.console
+import Queue
+from qpid.content import Content
+from qpid.message import Message
+from qpid.harness import Skipped
+from qpid.exceptions import VersionError
+
+import qpid.messaging
+from qpidtoollibs import BrokerAgent
+
+class TestBase(unittest.TestCase):
+ """Base class for Qpid test cases.
+
+ self.client is automatically connected with channel 1 open before
+ the test methods are run.
+
+ Deletes queues and exchanges after. Tests call
+ self.queue_declare(channel, ...) and self.exchange_declare(chanel,
+ ...) which are wrappers for the Channel functions that note
+ resources to clean up later.
+ """
+
+ def configure(self, config):
+ self.config = config
+
+ def setUp(self):
+ self.queues = []
+ self.exchanges = []
+ self.client = self.connect()
+ self.channel = self.client.channel(1)
+ self.version = (self.client.spec.major, self.client.spec.minor)
+ if self.version == (8, 0) or self.version == (0, 9):
+ self.channel.channel_open()
+ else:
+ self.channel.session_open()
+
+ def tearDown(self):
+ try:
+ for ch, q in self.queues:
+ ch.queue_delete(queue=q)
+ for ch, ex in self.exchanges:
+ ch.exchange_delete(exchange=ex)
+ except:
+ print "Error on tearDown:"
+ print traceback.print_exc()
+
+ self.client.close()
+
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None, channel_options=None):
+ """Create a new connction, return the Client object"""
+ host = host or self.config.broker.host
+ port = port or self.config.broker.port or 5672
+ user = user or self.config.broker.user or "guest"
+ password = password or self.config.broker.password or "guest"
+ client = qpid.client.Client(host, port)
+ try:
+ client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties, channel_options=channel_options)
+ except qpid.client.Closed, e:
+ if isinstance(e.args[0], VersionError):
+ raise Skipped(e.args[0])
+ else:
+ raise e
+ except socket.error, e:
+ raise Skipped(e)
+ return client
+
+ def queue_declare(self, channel=None, *args, **keys):
+ channel = channel or self.channel
+ reply = channel.queue_declare(*args, **keys)
+ self.queues.append((channel, keys["queue"]))
+ return reply
+
+ def exchange_declare(self, channel=None, ticket=0, exchange='',
+ type='', passive=False, durable=False,
+ auto_delete=False,
+ arguments={}):
+ channel = channel or self.channel
+ reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments)
+ self.exchanges.append((channel,exchange))
+ return reply
+
+ def uniqueString(self):
+ """Generate a unique string, unique for this TestBase instance"""
+ if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+ return "Test Message " + str(self.uniqueCounter)
+
+ def randomLongString(self, length=65535):
+ body = ''.join(random.choice(string.ascii_uppercase) for _ in range(length))
+ return body
+
+ def consume(self, queueName, no_ack=True):
+ """Consume from named queue returns the Queue object."""
+
+ reply = self.channel.basic_consume(queue=queueName, no_ack=no_ack)
+ return self.client.queue(reply.consumer_tag)
+
+ def subscribe(self, channel=None, **keys):
+ channel = channel or self.channel
+ consumer_tag = keys["destination"]
+ channel.message_subscribe(**keys)
+ channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
+ channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
+
+ def assertEmpty(self, queue):
+ """Assert that the queue is empty"""
+ try:
+ queue.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Queue.Empty: None # Ignore
+
+ def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
+ """
+ Publish to exchange and assert queue.get() returns the same message.
+ """
+ body = self.uniqueString()
+ self.channel.basic_publish(
+ exchange=exchange,
+ content=Content(body, properties=properties),
+ routing_key=routing_key)
+ msg = queue.get(timeout=1)
+ self.assertEqual(body, msg.content.body)
+ if (properties):
+ self.assertEqual(properties, msg.content.properties)
+
+ def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
+ """
+ Publish a message and consume it, assert it comes back intact.
+ Return the Queue object used to consume.
+ """
+ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
+
+ def assertChannelException(self, expectedCode, message):
+ if self.version == (8, 0) or self.version == (0, 9):
+ if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message))
+ self.assertEqual("channel", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ else:
+ if not isinstance(message, Message): self.fail("expected session_closed method, got %s" % (message))
+ self.assertEqual("session", message.method.klass.name)
+ self.assertEqual("closed", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
+
+
+ def assertConnectionException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message))
+ self.assertEqual("connection", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
+
+#0-10 support
+from qpid.connection import Connection
+from qpid.util import connect, ssl, URL
+
+class TestBase010(unittest.TestCase):
+ """
+ Base class for Qpid test cases. using the final 0-10 spec
+ """
+
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ self.conn = self.connect()
+ self.session = self.conn.session("test-session", timeout=10)
+ self.qmf = None
+ self.test_queue_name = self.id()
+
+ def startQmf(self, handler=None):
+ self.qmf = qmf.console.Session(handler)
+ self.qmf_broker = self.qmf.addBroker(str(self.broker))
+
+ def startBrokerAccess(self):
+ """
+ New-style management access to the broker. Can be used in lieu of startQmf.
+ """
+ if 'broker_conn' not in self.__dict__:
+ self.broker_conn = qpid.messaging.Connection(str(self.broker))
+ self.broker_conn.open()
+ self.broker_access = BrokerAgent(self.broker_conn)
+
+ def connect(self, host=None, port=None):
+ url = self.broker
+ if url.scheme == URL.AMQPS:
+ default_port = 5671
+ else:
+ default_port = 5672
+ try:
+ sock = connect(host or url.host, port or url.port or default_port)
+ except socket.error, e:
+ raise Skipped(e)
+ if url.scheme == URL.AMQPS:
+ sock = ssl(sock)
+ conn = Connection(sock, username=url.user or "guest",
+ password=url.password or "guest")
+ try:
+ conn.start(timeout=10)
+ except VersionError, e:
+ raise Skipped(e)
+ return conn
+
+ def tearDown(self):
+ if not self.session.error(): self.session.close(timeout=10)
+ self.conn.close(timeout=10)
+ if self.qmf:
+ self.qmf.delBroker(self.qmf_broker)
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
+ session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)