summaryrefslogtreecommitdiff
path: root/python/qpid/tests
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/tests')
-rw-r--r--python/qpid/tests/__init__.py32
-rw-r--r--python/qpid/tests/messaging/__init__.py106
-rw-r--r--python/qpid/tests/messaging/address.py (renamed from python/qpid/tests/address.py)4
-rw-r--r--python/qpid/tests/messaging/endpoints.py (renamed from python/qpid/tests/messaging.py)205
-rw-r--r--python/qpid/tests/messaging/message.py116
5 files changed, 256 insertions, 207 deletions
diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py
index 039214ca42..101a0c3759 100644
--- a/python/qpid/tests/__init__.py
+++ b/python/qpid/tests/__init__.py
@@ -26,7 +26,35 @@ class Test:
self.config = config
# API Tests
-import address, framing, mimetype, messaging
+import qpid.tests.framing
+import qpid.tests.mimetype
+import qpid.tests.messaging
# Legacy Tests
-import codec, queue, datatypes, connection, spec010, codec010
+import qpid.tests.codec
+import qpid.tests.queue
+import qpid.tests.datatypes
+import qpid.tests.connection
+import qpid.tests.spec010
+import qpid.tests.codec010
+
+class TestTestsXXX(Test):
+
+ def testFoo(self):
+ print "this test has output"
+
+ def testBar(self):
+ print "this test "*8
+ print "has"*10
+ print "a"*75
+ print "lot of"*10
+ print "output"*10
+
+ def testQux(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+
+ def testQuxFail(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+ fdsa
diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py
new file mode 100644
index 0000000000..6785614f41
--- /dev/null
+++ b/python/qpid/tests/messaging/__init__.py
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from qpid.messaging import *
+from qpid.tests import Test
+
+class Base(Test):
+
+ def setup_connection(self):
+ return None
+
+ def setup_session(self):
+ return None
+
+ def setup_sender(self):
+ return None
+
+ def setup_receiver(self):
+ return None
+
+ def setup(self):
+ self.test_id = uuid4()
+ self.broker = self.config.broker
+ try:
+ self.conn = self.setup_connection()
+ except ConnectError, e:
+ raise Skipped(e)
+ self.ssn = self.setup_session()
+ self.snd = self.setup_sender()
+ if self.snd is not None:
+ self.snd.durable = self.durable()
+ self.rcv = self.setup_receiver()
+
+ def teardown(self):
+ if self.conn is not None and self.conn.connected():
+ self.conn.close()
+
+ def content(self, base, count = None):
+ if count is None:
+ return "%s[%s]" % (base, self.test_id)
+ else:
+ return "%s[%s, %s]" % (base, count, self.test_id)
+
+ def ping(self, ssn):
+ PING_Q = 'ping-queue; {create: always, delete: always}'
+ # send a message
+ sender = ssn.sender(PING_Q, durable=self.durable())
+ content = self.content("ping")
+ sender.send(content)
+ receiver = ssn.receiver(PING_Q)
+ msg = receiver.fetch(0)
+ ssn.acknowledge()
+ assert msg.content == content, "expected %r, got %r" % (content, msg.content)
+
+ def drain(self, rcv, limit=None, timeout=0, expected=None):
+ contents = []
+ try:
+ while limit is None or len(contents) < limit:
+ contents.append(rcv.fetch(timeout=timeout).content)
+ except Empty:
+ pass
+ if expected is not None:
+ assert expected == contents, "expected %s, got %s" % (expected, contents)
+ return contents
+
+ def assertEmpty(self, rcv):
+ contents = self.drain(rcv)
+ assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
+
+ def assertPending(self, rcv, expected):
+ p = rcv.pending()
+ assert p == expected, "expected %s, got %s" % (expected, p)
+
+ def sleep(self):
+ time.sleep(self.delay())
+
+ def delay(self):
+ return float(self.config.defines.get("delay", "2"))
+
+ def get_bool(self, name):
+ return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+ def durable(self):
+ return self.get_bool("durable")
+
+ def reconnect(self):
+ return self.get_bool("reconnect")
+
+import address, endpoints, message
diff --git a/python/qpid/tests/address.py b/python/qpid/tests/messaging/address.py
index 7e6c6a5ee5..7adbc0c6f7 100644
--- a/python/qpid/tests/address.py
+++ b/python/qpid/tests/messaging/address.py
@@ -19,11 +19,11 @@
from qpid.tests import Test
-from qpid.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \
+from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \
LEXER
from qpid.lexer import Token
from qpid.harness import Skipped
-from parser import ParserBase
+from qpid.tests.parser import ParserBase
def indent(st):
return " " + st.replace("\n", "\n ")
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging/endpoints.py
index 125f1b7157..2e70f13f3a 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -22,94 +22,9 @@
import time
from qpid import compat
-from qpid.tests import Test
from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, ReceiveError, SendError, SessionError, \
- UNLIMITED, uuid4
-from Queue import Queue, Empty as QueueEmpty
-
-class Base(Test):
-
- def setup_connection(self):
- return None
-
- def setup_session(self):
- return None
-
- def setup_sender(self):
- return None
-
- def setup_receiver(self):
- return None
-
- def setup(self):
- self.test_id = uuid4()
- self.broker = self.config.broker
- try:
- self.conn = self.setup_connection()
- except ConnectError, e:
- raise Skipped(e)
- self.ssn = self.setup_session()
- self.snd = self.setup_sender()
- if self.snd is not None:
- self.snd.durable = self.durable()
- self.rcv = self.setup_receiver()
-
- def teardown(self):
- if self.conn is not None and self.conn.connected():
- self.conn.close()
-
- def content(self, base, count = None):
- if count is None:
- return "%s[%s]" % (base, self.test_id)
- else:
- return "%s[%s, %s]" % (base, count, self.test_id)
-
- def ping(self, ssn):
- PING_Q = 'ping-queue; {create: always, delete: always}'
- # send a message
- sender = ssn.sender(PING_Q, durable=self.durable())
- content = self.content("ping")
- sender.send(content)
- receiver = ssn.receiver(PING_Q)
- msg = receiver.fetch(0)
- ssn.acknowledge()
- assert msg.content == content, "expected %r, got %r" % (content, msg.content)
-
- def drain(self, rcv, limit=None, timeout=0, expected=None):
- contents = []
- try:
- while limit is None or len(contents) < limit:
- contents.append(rcv.fetch(timeout=timeout).content)
- except Empty:
- pass
- if expected is not None:
- assert expected == contents, "expected %s, got %s" % (expected, contents)
- return contents
-
- def assertEmpty(self, rcv):
- contents = self.drain(rcv)
- assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
-
- def assertPending(self, rcv, expected):
- p = rcv.pending()
- assert p == expected, "expected %s, got %s" % (expected, p)
-
- def sleep(self):
- time.sleep(self.delay())
-
- def delay(self):
- return float(self.config.defines.get("delay", "2"))
-
- def get_bool(self, name):
- return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
-
- def durable(self):
- return self.get_bool("durable")
-
- def reconnect(self):
- return self.get_bool("reconnect")
+from qpid.messaging import *
+from qpid.tests.messaging import Base
class SetupTests(Base):
@@ -961,119 +876,3 @@ class SenderTests(Base):
self.drain(self.rcv, expected=msgs)
self.ssn.acknowledge()
assert caught, "did not exceed capacity"
-
-class MessageTests(Base):
-
- def testCreateString(self):
- m = Message("string")
- assert m.content == "string"
- assert m.content_type is None
-
- def testCreateUnicode(self):
- m = Message(u"unicode")
- assert m.content == u"unicode"
- assert m.content_type == "text/plain"
-
- def testCreateMap(self):
- m = Message({})
- assert m.content == {}
- assert m.content_type == "amqp/map"
-
- def testCreateList(self):
- m = Message([])
- assert m.content == []
- assert m.content_type == "amqp/list"
-
- def testContentTypeOverride(self):
- m = Message()
- m.content_type = "text/html; charset=utf8"
- m.content = u"<html/>"
- assert m.content_type == "text/html; charset=utf8"
-
-ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
-
-class MessageEchoTests(Base):
-
- def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port,
- reconnect=self.reconnect())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(ECHO_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(ECHO_Q)
-
- def check(self, msg):
- self.snd.send(msg)
- echo = self.rcv.fetch(0)
-
- assert msg.id == echo.id
- assert msg.subject == echo.subject
- assert msg.user_id == echo.user_id
- assert msg.to == echo.to
- assert msg.reply_to == echo.reply_to
- assert msg.correlation_id == echo.correlation_id
- assert msg.properties == echo.properties
- assert msg.content_type == echo.content_type
- assert msg.content == echo.content, "%s, %s" % (msg, echo)
-
- self.ssn.acknowledge(echo)
-
- def testStringContent(self):
- self.check(Message("string"))
-
- def testUnicodeContent(self):
- self.check(Message(u"unicode"))
-
-
- TEST_MAP = {"key1": "string",
- "key2": u"unicode",
- "key3": 3,
- "key4": -3,
- "key5": 3.14,
- "key6": -3.14,
- "key7": ["one", 2, 3.14],
- "key8": [],
- "key9": {"sub-key0": 3}}
-
- def testMapContent(self):
- self.check(Message(MessageEchoTests.TEST_MAP))
-
- def testListContent(self):
- self.check(Message([]))
- self.check(Message([1, 2, 3]))
- self.check(Message(["one", 2, 3.14, {"four": 4}]))
-
- def testProperties(self):
- msg = Message()
- msg.to = "to-address"
- msg.subject = "subject"
- msg.correlation_id = str(self.test_id)
- msg.properties = MessageEchoTests.TEST_MAP
- msg.reply_to = "reply-address"
- self.check(msg)
-
-class TestTestsXXX(Test):
-
- def testFoo(self):
- print "this test has output"
-
- def testBar(self):
- print "this test "*8
- print "has"*10
- print "a"*75
- print "lot of"*10
- print "output"*10
-
- def testQux(self):
- import sys
- sys.stdout.write("this test has output with no newline")
-
- def testQuxFail(self):
- import sys
- sys.stdout.write("this test has output with no newline")
- fdsa
diff --git a/python/qpid/tests/messaging/message.py b/python/qpid/tests/messaging/message.py
new file mode 100644
index 0000000000..ef2ec1aac4
--- /dev/null
+++ b/python/qpid/tests/messaging/message.py
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+
+class MessageTests(Base):
+
+ def testCreateString(self):
+ m = Message("string")
+ assert m.content == "string"
+ assert m.content_type is None
+
+ def testCreateUnicode(self):
+ m = Message(u"unicode")
+ assert m.content == u"unicode"
+ assert m.content_type == "text/plain"
+
+ def testCreateMap(self):
+ m = Message({})
+ assert m.content == {}
+ assert m.content_type == "amqp/map"
+
+ def testCreateList(self):
+ m = Message([])
+ assert m.content == []
+ assert m.content_type == "amqp/list"
+
+ def testContentTypeOverride(self):
+ m = Message()
+ m.content_type = "text/html; charset=utf8"
+ m.content = u"<html/>"
+ assert m.content_type == "text/html; charset=utf8"
+
+ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
+
+class MessageEchoTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(ECHO_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(ECHO_Q)
+
+ def check(self, msg):
+ self.snd.send(msg)
+ echo = self.rcv.fetch(0)
+
+ assert msg.id == echo.id
+ assert msg.subject == echo.subject
+ assert msg.user_id == echo.user_id
+ assert msg.to == echo.to
+ assert msg.reply_to == echo.reply_to
+ assert msg.correlation_id == echo.correlation_id
+ assert msg.properties == echo.properties
+ assert msg.content_type == echo.content_type
+ assert msg.content == echo.content, "%s, %s" % (msg, echo)
+
+ self.ssn.acknowledge(echo)
+
+ def testStringContent(self):
+ self.check(Message("string"))
+
+ def testUnicodeContent(self):
+ self.check(Message(u"unicode"))
+
+
+ TEST_MAP = {"key1": "string",
+ "key2": u"unicode",
+ "key3": 3,
+ "key4": -3,
+ "key5": 3.14,
+ "key6": -3.14,
+ "key7": ["one", 2, 3.14],
+ "key8": [],
+ "key9": {"sub-key0": 3}}
+
+ def testMapContent(self):
+ self.check(Message(MessageEchoTests.TEST_MAP))
+
+ def testListContent(self):
+ self.check(Message([]))
+ self.check(Message([1, 2, 3]))
+ self.check(Message(["one", 2, 3.14, {"four": 4}]))
+
+ def testProperties(self):
+ msg = Message()
+ msg.to = "to-address"
+ msg.subject = "subject"
+ msg.correlation_id = str(self.test_id)
+ msg.properties = MessageEchoTests.TEST_MAP
+ msg.reply_to = "reply-address"
+ self.check(msg)