diff options
Diffstat (limited to 'python/qpid/tests')
-rw-r--r-- | python/qpid/tests/__init__.py | 32 | ||||
-rw-r--r-- | python/qpid/tests/messaging/__init__.py | 106 | ||||
-rw-r--r-- | python/qpid/tests/messaging/address.py (renamed from python/qpid/tests/address.py) | 4 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py (renamed from python/qpid/tests/messaging.py) | 205 | ||||
-rw-r--r-- | python/qpid/tests/messaging/message.py | 116 |
5 files changed, 256 insertions, 207 deletions
diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py index 039214ca42..101a0c3759 100644 --- a/python/qpid/tests/__init__.py +++ b/python/qpid/tests/__init__.py @@ -26,7 +26,35 @@ class Test: self.config = config # API Tests -import address, framing, mimetype, messaging +import qpid.tests.framing +import qpid.tests.mimetype +import qpid.tests.messaging # Legacy Tests -import codec, queue, datatypes, connection, spec010, codec010 +import qpid.tests.codec +import qpid.tests.queue +import qpid.tests.datatypes +import qpid.tests.connection +import qpid.tests.spec010 +import qpid.tests.codec010 + +class TestTestsXXX(Test): + + def testFoo(self): + print "this test has output" + + def testBar(self): + print "this test "*8 + print "has"*10 + print "a"*75 + print "lot of"*10 + print "output"*10 + + def testQux(self): + import sys + sys.stdout.write("this test has output with no newline") + + def testQuxFail(self): + import sys + sys.stdout.write("this test has output with no newline") + fdsa diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py new file mode 100644 index 0000000000..6785614f41 --- /dev/null +++ b/python/qpid/tests/messaging/__init__.py @@ -0,0 +1,106 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time +from qpid.messaging import * +from qpid.tests import Test + +class Base(Test): + + def setup_connection(self): + return None + + def setup_session(self): + return None + + def setup_sender(self): + return None + + def setup_receiver(self): + return None + + def setup(self): + self.test_id = uuid4() + self.broker = self.config.broker + try: + self.conn = self.setup_connection() + except ConnectError, e: + raise Skipped(e) + self.ssn = self.setup_session() + self.snd = self.setup_sender() + if self.snd is not None: + self.snd.durable = self.durable() + self.rcv = self.setup_receiver() + + def teardown(self): + if self.conn is not None and self.conn.connected(): + self.conn.close() + + def content(self, base, count = None): + if count is None: + return "%s[%s]" % (base, self.test_id) + else: + return "%s[%s, %s]" % (base, count, self.test_id) + + def ping(self, ssn): + PING_Q = 'ping-queue; {create: always, delete: always}' + # send a message + sender = ssn.sender(PING_Q, durable=self.durable()) + content = self.content("ping") + sender.send(content) + receiver = ssn.receiver(PING_Q) + msg = receiver.fetch(0) + ssn.acknowledge() + assert msg.content == content, "expected %r, got %r" % (content, msg.content) + + def drain(self, rcv, limit=None, timeout=0, expected=None): + contents = [] + try: + while limit is None or len(contents) < limit: + contents.append(rcv.fetch(timeout=timeout).content) + except Empty: + pass + if expected is not None: + assert expected == contents, "expected %s, got %s" % (expected, contents) + return contents + + def assertEmpty(self, rcv): + contents = self.drain(rcv) + assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) + + def assertPending(self, rcv, expected): + p = rcv.pending() + assert p == expected, "expected %s, got %s" % (expected, p) + + def sleep(self): + time.sleep(self.delay()) + + def delay(self): + return float(self.config.defines.get("delay", "2")) + + def get_bool(self, name): + return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") + + def durable(self): + return self.get_bool("durable") + + def reconnect(self): + return self.get_bool("reconnect") + +import address, endpoints, message diff --git a/python/qpid/tests/address.py b/python/qpid/tests/messaging/address.py index 7e6c6a5ee5..7adbc0c6f7 100644 --- a/python/qpid/tests/address.py +++ b/python/qpid/tests/messaging/address.py @@ -19,11 +19,11 @@ from qpid.tests import Test -from qpid.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \ +from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \ LEXER from qpid.lexer import Token from qpid.harness import Skipped -from parser import ParserBase +from qpid.tests.parser import ParserBase def indent(st): return " " + st.replace("\n", "\n ") diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging/endpoints.py index 125f1b7157..2e70f13f3a 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -22,94 +22,9 @@ import time from qpid import compat -from qpid.tests import Test from qpid.harness import Skipped -from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ - InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \ - UNLIMITED, uuid4 -from Queue import Queue, Empty as QueueEmpty - -class Base(Test): - - def setup_connection(self): - return None - - def setup_session(self): - return None - - def setup_sender(self): - return None - - def setup_receiver(self): - return None - - def setup(self): - self.test_id = uuid4() - self.broker = self.config.broker - try: - self.conn = self.setup_connection() - except ConnectError, e: - raise Skipped(e) - self.ssn = self.setup_session() - self.snd = self.setup_sender() - if self.snd is not None: - self.snd.durable = self.durable() - self.rcv = self.setup_receiver() - - def teardown(self): - if self.conn is not None and self.conn.connected(): - self.conn.close() - - def content(self, base, count = None): - if count is None: - return "%s[%s]" % (base, self.test_id) - else: - return "%s[%s, %s]" % (base, count, self.test_id) - - def ping(self, ssn): - PING_Q = 'ping-queue; {create: always, delete: always}' - # send a message - sender = ssn.sender(PING_Q, durable=self.durable()) - content = self.content("ping") - sender.send(content) - receiver = ssn.receiver(PING_Q) - msg = receiver.fetch(0) - ssn.acknowledge() - assert msg.content == content, "expected %r, got %r" % (content, msg.content) - - def drain(self, rcv, limit=None, timeout=0, expected=None): - contents = [] - try: - while limit is None or len(contents) < limit: - contents.append(rcv.fetch(timeout=timeout).content) - except Empty: - pass - if expected is not None: - assert expected == contents, "expected %s, got %s" % (expected, contents) - return contents - - def assertEmpty(self, rcv): - contents = self.drain(rcv) - assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) - - def assertPending(self, rcv, expected): - p = rcv.pending() - assert p == expected, "expected %s, got %s" % (expected, p) - - def sleep(self): - time.sleep(self.delay()) - - def delay(self): - return float(self.config.defines.get("delay", "2")) - - def get_bool(self, name): - return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") - - def durable(self): - return self.get_bool("durable") - - def reconnect(self): - return self.get_bool("reconnect") +from qpid.messaging import * +from qpid.tests.messaging import Base class SetupTests(Base): @@ -961,119 +876,3 @@ class SenderTests(Base): self.drain(self.rcv, expected=msgs) self.ssn.acknowledge() assert caught, "did not exceed capacity" - -class MessageTests(Base): - - def testCreateString(self): - m = Message("string") - assert m.content == "string" - assert m.content_type is None - - def testCreateUnicode(self): - m = Message(u"unicode") - assert m.content == u"unicode" - assert m.content_type == "text/plain" - - def testCreateMap(self): - m = Message({}) - assert m.content == {} - assert m.content_type == "amqp/map" - - def testCreateList(self): - m = Message([]) - assert m.content == [] - assert m.content_type == "amqp/list" - - def testContentTypeOverride(self): - m = Message() - m.content_type = "text/html; charset=utf8" - m.content = u"<html/>" - assert m.content_type == "text/html; charset=utf8" - -ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' - -class MessageEchoTests(Base): - - def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - reconnect=self.reconnect()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(ECHO_Q) - - def setup_receiver(self): - return self.ssn.receiver(ECHO_Q) - - def check(self, msg): - self.snd.send(msg) - echo = self.rcv.fetch(0) - - assert msg.id == echo.id - assert msg.subject == echo.subject - assert msg.user_id == echo.user_id - assert msg.to == echo.to - assert msg.reply_to == echo.reply_to - assert msg.correlation_id == echo.correlation_id - assert msg.properties == echo.properties - assert msg.content_type == echo.content_type - assert msg.content == echo.content, "%s, %s" % (msg, echo) - - self.ssn.acknowledge(echo) - - def testStringContent(self): - self.check(Message("string")) - - def testUnicodeContent(self): - self.check(Message(u"unicode")) - - - TEST_MAP = {"key1": "string", - "key2": u"unicode", - "key3": 3, - "key4": -3, - "key5": 3.14, - "key6": -3.14, - "key7": ["one", 2, 3.14], - "key8": [], - "key9": {"sub-key0": 3}} - - def testMapContent(self): - self.check(Message(MessageEchoTests.TEST_MAP)) - - def testListContent(self): - self.check(Message([])) - self.check(Message([1, 2, 3])) - self.check(Message(["one", 2, 3.14, {"four": 4}])) - - def testProperties(self): - msg = Message() - msg.to = "to-address" - msg.subject = "subject" - msg.correlation_id = str(self.test_id) - msg.properties = MessageEchoTests.TEST_MAP - msg.reply_to = "reply-address" - self.check(msg) - -class TestTestsXXX(Test): - - def testFoo(self): - print "this test has output" - - def testBar(self): - print "this test "*8 - print "has"*10 - print "a"*75 - print "lot of"*10 - print "output"*10 - - def testQux(self): - import sys - sys.stdout.write("this test has output with no newline") - - def testQuxFail(self): - import sys - sys.stdout.write("this test has output with no newline") - fdsa diff --git a/python/qpid/tests/messaging/message.py b/python/qpid/tests/messaging/message.py new file mode 100644 index 0000000000..ef2ec1aac4 --- /dev/null +++ b/python/qpid/tests/messaging/message.py @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base + +class MessageTests(Base): + + def testCreateString(self): + m = Message("string") + assert m.content == "string" + assert m.content_type is None + + def testCreateUnicode(self): + m = Message(u"unicode") + assert m.content == u"unicode" + assert m.content_type == "text/plain" + + def testCreateMap(self): + m = Message({}) + assert m.content == {} + assert m.content_type == "amqp/map" + + def testCreateList(self): + m = Message([]) + assert m.content == [] + assert m.content_type == "amqp/list" + + def testContentTypeOverride(self): + m = Message() + m.content_type = "text/html; charset=utf8" + m.content = u"<html/>" + assert m.content_type == "text/html; charset=utf8" + +ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' + +class MessageEchoTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port, + reconnect=self.reconnect()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender(ECHO_Q) + + def setup_receiver(self): + return self.ssn.receiver(ECHO_Q) + + def check(self, msg): + self.snd.send(msg) + echo = self.rcv.fetch(0) + + assert msg.id == echo.id + assert msg.subject == echo.subject + assert msg.user_id == echo.user_id + assert msg.to == echo.to + assert msg.reply_to == echo.reply_to + assert msg.correlation_id == echo.correlation_id + assert msg.properties == echo.properties + assert msg.content_type == echo.content_type + assert msg.content == echo.content, "%s, %s" % (msg, echo) + + self.ssn.acknowledge(echo) + + def testStringContent(self): + self.check(Message("string")) + + def testUnicodeContent(self): + self.check(Message(u"unicode")) + + + TEST_MAP = {"key1": "string", + "key2": u"unicode", + "key3": 3, + "key4": -3, + "key5": 3.14, + "key6": -3.14, + "key7": ["one", 2, 3.14], + "key8": [], + "key9": {"sub-key0": 3}} + + def testMapContent(self): + self.check(Message(MessageEchoTests.TEST_MAP)) + + def testListContent(self): + self.check(Message([])) + self.check(Message([1, 2, 3])) + self.check(Message(["one", 2, 3.14, {"four": 4}])) + + def testProperties(self): + msg = Message() + msg.to = "to-address" + msg.subject = "subject" + msg.correlation_id = str(self.test_id) + msg.properties = MessageEchoTests.TEST_MAP + msg.reply_to = "reply-address" + self.check(msg) |