diff options
Diffstat (limited to 'qpid/python/qpid/testlib.py')
-rw-r--r-- | qpid/python/qpid/testlib.py | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py new file mode 100644 index 0000000000..256aa7b5e6 --- /dev/null +++ b/qpid/python/qpid/testlib.py @@ -0,0 +1,241 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Support library for qpid python tests. +# + +import string +import random + +import unittest, traceback, socket +import qpid.client, qmf.console +import Queue +from qpid.content import Content +from qpid.message import Message +from qpid.harness import Skipped +from qpid.exceptions import VersionError + +import qpid.messaging +from qpidtoollibs import BrokerAgent + +class TestBase(unittest.TestCase): + """Base class for Qpid test cases. + + self.client is automatically connected with channel 1 open before + the test methods are run. + + Deletes queues and exchanges after. Tests call + self.queue_declare(channel, ...) and self.exchange_declare(chanel, + ...) which are wrappers for the Channel functions that note + resources to clean up later. + """ + + def configure(self, config): + self.config = config + + def setUp(self): + self.queues = [] + self.exchanges = [] + self.client = self.connect() + self.channel = self.client.channel(1) + self.version = (self.client.spec.major, self.client.spec.minor) + if self.version == (8, 0) or self.version == (0, 9): + self.channel.channel_open() + else: + self.channel.session_open() + + def tearDown(self): + try: + for ch, q in self.queues: + ch.queue_delete(queue=q) + for ch, ex in self.exchanges: + ch.exchange_delete(exchange=ex) + except: + print "Error on tearDown:" + print traceback.print_exc() + + self.client.close() + + def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None, channel_options=None): + """Create a new connction, return the Client object""" + host = host or self.config.broker.host + port = port or self.config.broker.port or 5672 + user = user or self.config.broker.user or "guest" + password = password or self.config.broker.password or "guest" + client = qpid.client.Client(host, port) + try: + client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties, channel_options=channel_options) + except qpid.client.Closed, e: + if isinstance(e.args[0], VersionError): + raise Skipped(e.args[0]) + else: + raise e + except socket.error, e: + raise Skipped(e) + return client + + def queue_declare(self, channel=None, *args, **keys): + channel = channel or self.channel + reply = channel.queue_declare(*args, **keys) + self.queues.append((channel, keys["queue"])) + return reply + + def exchange_declare(self, channel=None, ticket=0, exchange='', + type='', passive=False, durable=False, + auto_delete=False, + arguments={}): + channel = channel or self.channel + reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments) + self.exchanges.append((channel,exchange)) + return reply + + def uniqueString(self): + """Generate a unique string, unique for this TestBase instance""" + if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; + return "Test Message " + str(self.uniqueCounter) + + def randomLongString(self, length=65535): + body = ''.join(random.choice(string.ascii_uppercase) for _ in range(length)) + return body + + def consume(self, queueName, no_ack=True): + """Consume from named queue returns the Queue object.""" + + reply = self.channel.basic_consume(queue=queueName, no_ack=no_ack) + return self.client.queue(reply.consumer_tag) + + def subscribe(self, channel=None, **keys): + channel = channel or self.channel + consumer_tag = keys["destination"] + channel.message_subscribe(**keys) + channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) + channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) + + def assertEmpty(self, queue): + """Assert that the queue is empty""" + try: + queue.get(timeout=1) + self.fail("Queue is not empty.") + except Queue.Empty: None # Ignore + + def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): + """ + Publish to exchange and assert queue.get() returns the same message. + """ + body = self.uniqueString() + self.channel.basic_publish( + exchange=exchange, + content=Content(body, properties=properties), + routing_key=routing_key) + msg = queue.get(timeout=1) + self.assertEqual(body, msg.content.body) + if (properties): + self.assertEqual(properties, msg.content.properties) + + def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): + """ + Publish a message and consume it, assert it comes back intact. + Return the Queue object used to consume. + """ + self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) + + def assertChannelException(self, expectedCode, message): + if self.version == (8, 0) or self.version == (0, 9): + if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) + self.assertEqual("channel", message.method.klass.name) + self.assertEqual("close", message.method.name) + else: + if not isinstance(message, Message): self.fail("expected session_closed method, got %s" % (message)) + self.assertEqual("session", message.method.klass.name) + self.assertEqual("closed", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + + + def assertConnectionException(self, expectedCode, message): + if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message)) + self.assertEqual("connection", message.method.klass.name) + self.assertEqual("close", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + +#0-10 support +from qpid.connection import Connection +from qpid.util import connect, ssl, URL + +class TestBase010(unittest.TestCase): + """ + Base class for Qpid test cases. using the final 0-10 spec + """ + + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + self.conn = self.connect() + self.session = self.conn.session("test-session", timeout=10) + self.qmf = None + self.test_queue_name = self.id() + + def startQmf(self, handler=None): + self.qmf = qmf.console.Session(handler) + self.qmf_broker = self.qmf.addBroker(str(self.broker)) + + def startBrokerAccess(self): + """ + New-style management access to the broker. Can be used in lieu of startQmf. + """ + if 'broker_conn' not in self.__dict__: + self.broker_conn = qpid.messaging.Connection(str(self.broker)) + self.broker_conn.open() + self.broker_access = BrokerAgent(self.broker_conn) + + def connect(self, host=None, port=None): + url = self.broker + if url.scheme == URL.AMQPS: + default_port = 5671 + else: + default_port = 5672 + try: + sock = connect(host or url.host, port or url.port or default_port) + except socket.error, e: + raise Skipped(e) + if url.scheme == URL.AMQPS: + sock = ssl(sock) + conn = Connection(sock, username=url.user or "guest", + password=url.password or "guest") + try: + conn.start(timeout=10) + except VersionError, e: + raise Skipped(e) + return conn + + def tearDown(self): + if not self.session.error(): self.session.close(timeout=10) + self.conn.close(timeout=10) + if self.qmf: + self.qmf.delBroker(self.qmf_broker) + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) + session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) |