summaryrefslogtreecommitdiff
path: root/python/tests
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests')
-rw-r--r--python/tests/__init__.py10
-rw-r--r--python/tests/assembler.py78
-rw-r--r--python/tests/codec.py14
-rw-r--r--python/tests/codec010.py14
-rw-r--r--python/tests/connection.py24
-rw-r--r--python/tests/datatypes.py12
-rw-r--r--python/tests/framer.py95
-rw-r--r--python/tests/spec.py74
-rw-r--r--python/tests/spec010.py70
9 files changed, 57 insertions, 334 deletions
diff --git a/python/tests/__init__.py b/python/tests/__init__.py
index 8ad514fc2f..1e495f3af3 100644
--- a/python/tests/__init__.py
+++ b/python/tests/__init__.py
@@ -19,12 +19,4 @@
# under the License.
#
-from codec import *
-from queue import *
-from spec import *
-from framer import *
-from assembler import *
-from datatypes import *
-from connection import *
-from spec010 import *
-from codec010 import *
+import codec, queue, datatypes, connection, spec010, codec010
diff --git a/python/tests/assembler.py b/python/tests/assembler.py
deleted file mode 100644
index f4e37084b6..0000000000
--- a/python/tests/assembler.py
+++ /dev/null
@@ -1,78 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from threading import *
-from unittest import TestCase
-from qpid.util import connect, listen
-from qpid.assembler import *
-
-PORT = 1234
-
-class AssemblerTest(TestCase):
-
- def setUp(self):
- started = Event()
- self.running = True
-
- def run():
- running = True
- for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
- asm = Assembler(s)
- try:
- asm.write_header(*asm.read_header()[-2:])
- while True:
- seg = asm.read_segment()
- asm.write_segment(seg)
- except Closed:
- pass
-
- self.server = Thread(target=run)
- self.server.setDaemon(True)
- self.server.start()
-
- started.wait(3)
- assert started.isSet()
-
- def tearDown(self):
- self.running = False
- self.server.join()
-
- def test(self):
- asm = Assembler(connect("0.0.0.0", PORT), max_payload = 1)
- asm.write_header(0, 10)
- asm.write_segment(Segment(True, False, 1, 2, 3, "TEST"))
- asm.write_segment(Segment(False, True, 1, 2, 3, "ING"))
-
- assert asm.read_header() == ("AMQP", 1, 1, 0, 10)
-
- seg = asm.read_segment()
- assert seg.first == True
- assert seg.last == False
- assert seg.type == 1
- assert seg.track == 2
- assert seg.channel == 3
- assert seg.payload == "TEST"
-
- seg = asm.read_segment()
- assert seg.first == False
- assert seg.last == True
- assert seg.type == 1
- assert seg.track == 2
- assert seg.channel == 3
- assert seg.payload == "ING"
diff --git a/python/tests/codec.py b/python/tests/codec.py
index 4bd3675af8..9b51b4713c 100644
--- a/python/tests/codec.py
+++ b/python/tests/codec.py
@@ -23,7 +23,6 @@ from qpid.codec import Codec
from qpid.spec import load
from cStringIO import StringIO
from qpid.reference import ReferenceId
-from qpid.testlib import testrunner
__doc__ = """
@@ -54,13 +53,8 @@ __doc__ = """
"""
-SPEC = None
-
-def spec():
- global SPEC
- if SPEC == None:
- SPEC = load(testrunner.get_spec_file("amqp.0-8.xml"))
- return SPEC
+from qpid_config import amqp_spec_0_8
+SPEC = load(amqp_spec_0_8)
# --------------------------------------
# --------------------------------------
@@ -76,7 +70,7 @@ class BaseDataTypes(unittest.TestCase):
"""
standard setUp for unitetest (refer unittest documentation for details)
"""
- self.codec = Codec(StringIO(), spec())
+ self.codec = Codec(StringIO(), SPEC)
# ------------------
def tearDown(self):
@@ -507,7 +501,7 @@ def test(type, value):
else:
values = [value]
stream = StringIO()
- codec = Codec(stream, spec())
+ codec = Codec(stream, SPEC)
for v in values:
codec.encode(type, v)
codec.flush()
diff --git a/python/tests/codec010.py b/python/tests/codec010.py
index a1f89dc3f4..787ebc146f 100644
--- a/python/tests/codec010.py
+++ b/python/tests/codec010.py
@@ -20,21 +20,17 @@
import time
from unittest import TestCase
-from qpid.spec010 import load
from qpid.codec010 import StringCodec
-from qpid.testlib import testrunner
from qpid.datatypes import timestamp, uuid4
+from qpid.ops import PRIMITIVE
class CodecTest(TestCase):
- def setUp(self):
- self.spec = load(testrunner.get_spec_file("amqp.0-10.xml"))
-
def check(self, type, value, compare=True):
- t = self.spec[type]
- sc = StringCodec(self.spec)
- t.encode(sc, value)
- decoded = t.decode(sc)
+ t = PRIMITIVE[type]
+ sc = StringCodec()
+ sc.write_primitive(t, value)
+ decoded = sc.read_primitive(t)
if compare:
assert decoded == value, "%s, %s" % (decoded, value)
return decoded
diff --git a/python/tests/connection.py b/python/tests/connection.py
index 19cdad9f97..d340e4e9c1 100644
--- a/python/tests/connection.py
+++ b/python/tests/connection.py
@@ -22,10 +22,10 @@ from unittest import TestCase
from qpid.util import connect, listen
from qpid.connection import *
from qpid.datatypes import Message
-from qpid.testlib import testrunner
from qpid.delegates import Server
from qpid.queue import Queue
from qpid.session import Delegate
+from qpid.ops import QueueQueryResult
PORT = 1234
@@ -51,12 +51,12 @@ class TestSession(Delegate):
pass
def queue_query(self, qq):
- return qq._type.result.type.new((qq.queue,), {})
+ return QueueQueryResult(qq.queue)
- def message_transfer(self, cmd, headers, body):
+ def message_transfer(self, cmd):
if cmd.destination == "echo":
- m = Message(body)
- m.headers = headers
+ m = Message(cmd.payload)
+ m.headers = cmd.headers
self.session.message_transfer(cmd.destination, cmd.accept_mode,
cmd.acquire_mode, m)
elif cmd.destination == "abort":
@@ -64,7 +64,7 @@ class TestSession(Delegate):
elif cmd.destination == "heartbeat":
self.session.channel.connection_heartbeat()
else:
- self.queue.put((cmd, headers, body))
+ self.queue.put(cmd)
class ConnectionTest(TestCase):
@@ -134,17 +134,17 @@ class ConnectionTest(TestCase):
ssn.message_transfer(d)
for d in destinations:
- cmd, header, body = self.queue.get(10)
+ cmd = self.queue.get(10)
assert cmd.destination == d
- assert header == None
- assert body == None
+ assert cmd.headers == None
+ assert cmd.payload == None
msg = Message("this is a test")
ssn.message_transfer("four", message=msg)
- cmd, header, body = self.queue.get(10)
+ cmd = self.queue.get(10)
assert cmd.destination == "four"
- assert header == None
- assert body == msg.body
+ assert cmd.headers == None
+ assert cmd.payload == msg.body
qq = ssn.queue_query("asdf")
assert qq.queue == "asdf"
diff --git a/python/tests/datatypes.py b/python/tests/datatypes.py
index e9e09094fa..1a60bb4107 100644
--- a/python/tests/datatypes.py
+++ b/python/tests/datatypes.py
@@ -18,9 +18,8 @@
#
from unittest import TestCase
-from qpid.testlib import testrunner
-from qpid.spec010 import load
from qpid.datatypes import *
+from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties
class SerialTest(TestCase):
@@ -176,10 +175,9 @@ class UUIDTest(TestCase):
class MessageTest(TestCase):
def setUp(self):
- self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml"))
- self.mp = Struct(self.spec["message.message_properties"])
- self.dp = Struct(self.spec["message.delivery_properties"])
- self.fp = Struct(self.spec["message.fragment_properties"])
+ self.mp = MessageProperties()
+ self.dp = DeliveryProperties()
+ self.fp = FragmentProperties()
def testHas(self):
m = Message(self.mp, self.dp, self.fp, "body")
@@ -207,7 +205,7 @@ class MessageTest(TestCase):
def testSetReplace(self):
m = Message(self.mp, self.dp, self.fp, "body")
- dp = Struct(self.spec["message.delivery_properties"])
+ dp = DeliveryProperties()
assert m.get("delivery_properties") == self.dp
assert m.get("delivery_properties") != dp
m.set(dp)
diff --git a/python/tests/framer.py b/python/tests/framer.py
deleted file mode 100644
index e99166721c..0000000000
--- a/python/tests/framer.py
+++ /dev/null
@@ -1,95 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from threading import *
-from unittest import TestCase
-from qpid.util import connect, listen
-from qpid.framer import *
-
-PORT = 1234
-
-class FramerTest(TestCase):
-
- def setUp(self):
- self.running = True
- started = Event()
- def run():
- for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
- conn = Framer(s)
- try:
- conn.write_header(*conn.read_header()[-2:])
- while True:
- frame = conn.read_frame()
- conn.write_frame(frame)
- conn.flush()
- except Closed:
- pass
-
- self.server = Thread(target=run)
- self.server.setDaemon(True)
- self.server.start()
-
- started.wait(3)
- assert started.isSet()
-
- def tearDown(self):
- self.running = False
- self.server.join(3)
-
- def test(self):
- c = Framer(connect("0.0.0.0", PORT))
-
- c.write_header(0, 10)
- assert c.read_header() == ("AMQP", 1, 1, 0, 10)
-
- c.write_frame(Frame(FIRST_FRM, 1, 2, 3, "THIS"))
- c.write_frame(Frame(0, 1, 2, 3, "IS"))
- c.write_frame(Frame(0, 1, 2, 3, "A"))
- c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST"))
- c.flush()
-
- f = c.read_frame()
- assert f.flags & FIRST_FRM
- assert not (f.flags & LAST_FRM)
- assert f.type == 1
- assert f.track == 2
- assert f.channel == 3
- assert f.payload == "THIS"
-
- f = c.read_frame()
- assert f.flags == 0
- assert f.type == 1
- assert f.track == 2
- assert f.channel == 3
- assert f.payload == "IS"
-
- f = c.read_frame()
- assert f.flags == 0
- assert f.type == 1
- assert f.track == 2
- assert f.channel == 3
- assert f.payload == "A"
-
- f = c.read_frame()
- assert f.flags & LAST_FRM
- assert not (f.flags & FIRST_FRM)
- assert f.type == 1
- assert f.track == 2
- assert f.channel == 3
- assert f.payload == "TEST"
diff --git a/python/tests/spec.py b/python/tests/spec.py
deleted file mode 100644
index d5ea1d682a..0000000000
--- a/python/tests/spec.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from unittest import TestCase
-from qpid.spec import load
-from qpid.testlib import testrunner
-
-class SpecTest(TestCase):
-
- def check_load(self, *urls):
- spec = load(*map(testrunner.get_spec_file, urls))
- qdecl = spec.method("queue_declare")
- assert qdecl != None
- assert not qdecl.content
-
- queue = qdecl.fields.byname["queue"]
- assert queue != None
- assert queue.domain.name == "queue_name"
- assert queue.type == "shortstr"
-
- qdecl_ok = spec.method("queue_declare_ok")
-
- # 0-8 is actually 8-0
- if (spec.major == 8 and spec.minor == 0 or
- spec.major == 0 and spec.minor == 9):
- assert qdecl_ok != None
-
- assert len(qdecl.responses) == 1
- assert qdecl_ok in qdecl.responses
-
- publish = spec.method("basic_publish")
- assert publish != None
- assert publish.content
-
- if (spec.major == 0 and spec.minor == 10):
- assert qdecl_ok == None
- reply_to = spec.domains.byname["reply_to"]
- assert reply_to.type.size == 2
- assert reply_to.type.pack == 2
- assert len(reply_to.type.fields) == 2
-
- qq = spec.method("queue_query")
- assert qq != None
- assert qq.result.size == 4
- assert qq.result.type != None
- args = qq.result.fields.byname["arguments"]
- assert args.type == "table"
-
- def test_load_0_8(self):
- self.check_load("amqp.0-8.xml")
-
- def test_load_0_9(self):
- self.check_load("amqp.0-9.xml")
-
- def test_load_0_9_errata(self):
- self.check_load("amqp.0-9.xml", "amqp-errata.0-9.xml")
-
- def test_load_0_10(self):
- self.check_load("amqp.0-10-preview.xml")
diff --git a/python/tests/spec010.py b/python/tests/spec010.py
index df9cb9590a..ac04e1ee02 100644
--- a/python/tests/spec010.py
+++ b/python/tests/spec010.py
@@ -19,66 +19,56 @@
import os, tempfile, shutil, stat
from unittest import TestCase
-from qpid.spec010 import load
from qpid.codec010 import Codec, StringCodec
-from qpid.testlib import testrunner
-from qpid.datatypes import Struct
+from qpid.ops import *
class SpecTest(TestCase):
- def setUp(self):
- self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml"))
-
def testSessionHeader(self):
- hdr = self.spec["session.header"]
- sc = StringCodec(self.spec)
- hdr.encode(sc, Struct(hdr, sync=True))
+ sc = StringCodec()
+ sc.write_compound(Header(sync=True))
assert sc.encoded == "\x01\x01"
- sc = StringCodec(self.spec)
- hdr.encode(sc, Struct(hdr, sync=False))
+ sc = StringCodec()
+ sc.write_compound(Header(sync=False))
assert sc.encoded == "\x01\x00"
- def encdec(self, type, value):
- sc = StringCodec(self.spec)
- type.encode(sc, value)
- decoded = type.decode(sc)
+ def encdec(self, value):
+ sc = StringCodec()
+ sc.write_compound(value)
+ decoded = sc.read_compound(value.__class__)
return decoded
def testMessageProperties(self):
- mp = self.spec["message.message_properties"]
- rt = self.spec["message.reply_to"]
-
- props = Struct(mp, content_length=3735928559L,
- reply_to=Struct(rt, exchange="the exchange name",
- routing_key="the routing key"))
- dec = self.encdec(mp, props)
+ props = MessageProperties(content_length=3735928559L,
+ reply_to=ReplyTo(exchange="the exchange name",
+ routing_key="the routing key"))
+ dec = self.encdec(props)
assert props.content_length == dec.content_length
assert props.reply_to.exchange == dec.reply_to.exchange
assert props.reply_to.routing_key == dec.reply_to.routing_key
def testMessageSubscribe(self):
- ms = self.spec["message.subscribe"]
- cmd = Struct(ms, exclusive=True, destination="this is a test")
- dec = self.encdec(self.spec["message.subscribe"], cmd)
+ cmd = MessageSubscribe(exclusive=True, destination="this is a test")
+ dec = self.encdec(cmd)
assert cmd.exclusive == dec.exclusive
assert cmd.destination == dec.destination
def testXid(self):
- xid = self.spec["dtx.xid"]
- sc = StringCodec(self.spec)
- st = Struct(xid, format=0, global_id="gid", branch_id="bid")
- xid.encode(sc, st)
+ sc = StringCodec()
+ xid = Xid(format=0, global_id="gid", branch_id="bid")
+ sc.write_compound(xid)
assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
- assert xid.decode(sc).__dict__ == st.__dict__
+ dec = sc.read_compound(Xid)
+ assert xid.__dict__ == dec.__dict__
- def testLoadReadOnly(self):
- spec = "amqp.0-10-qpid-errata.xml"
- f = testrunner.get_spec_file(spec)
- dest = tempfile.mkdtemp()
- shutil.copy(f, dest)
- shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
- os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
- fname = os.path.join(dest, spec)
- load(fname)
- assert not os.path.exists("%s.pcl" % fname)
+# def testLoadReadOnly(self):
+# spec = "amqp.0-10-qpid-errata.xml"
+# f = testrunner.get_spec_file(spec)
+# dest = tempfile.mkdtemp()
+# shutil.copy(f, dest)
+# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
+# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
+# fname = os.path.join(dest, spec)
+# load(fname)
+# assert not os.path.exists("%s.pcl" % fname)