summaryrefslogtreecommitdiff
path: root/python/qpid/tests/framing.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/tests/framing.py')
-rw-r--r--python/qpid/tests/framing.py116
1 files changed, 116 insertions, 0 deletions
diff --git a/python/qpid/tests/framing.py b/python/qpid/tests/framing.py
index 4cd596b583..0b33df8b9a 100644
--- a/python/qpid/tests/framing.py
+++ b/python/qpid/tests/framing.py
@@ -40,6 +40,50 @@ class Base(Test):
assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2)
assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2)
+ def cmp_list(self, l1, l2):
+ if l1 is None:
+ assert l2 is None
+ return
+
+ assert len(l1) == len(l2)
+ for v1, v2 in zip(l1, l2):
+ if isinstance(v1, Compound):
+ self.cmp_ops(v1, v2)
+ else:
+ assert v1 == v2
+
+ def cmp_ops(self, op1, op2):
+ if op1 is None:
+ assert op2 is None
+ return
+
+ assert op1.__class__ == op2.__class__
+ cls = op1.__class__
+ assert op1.NAME == op2.NAME
+ assert op1.CODE == op2.CODE
+ assert op1.FIELDS == op2.FIELDS
+ for f in cls.FIELDS:
+ v1 = getattr(op1, f.name)
+ v2 = getattr(op2, f.name)
+ if COMPOUND.has_key(f.type) or f.type == "struct32":
+ self.cmp_ops(v1, v2)
+ elif f.type in ("list", "array"):
+ self.cmp_list(v1, v2)
+ else:
+ assert v1 == v2, "expected: %r, got %r" % (v1, v2)
+
+ if issubclass(cls, Command) or issubclass(cls, Control):
+ assert op1.channel == op2.channel
+
+ if issubclass(cls, Command):
+ assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync)
+ assert (op1.headers is None and op2.headers is None) or \
+ (op1.headers is not None and op2.headers is not None)
+ if op1.headers is not None:
+ assert len(op1.headers) == len(op2.headers)
+ for h1, h2 in zip(op1.headers, op2.headers):
+ self.cmp_ops(h1, h2)
+
class FrameTest(Base):
def enc_dec(self, frames, encoded=None):
@@ -171,3 +215,75 @@ class SegmentTest(Base):
for i in range(0, 8, 2)]
self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2)
+
+from qpid.ops import *
+
+class OpTest(Base):
+
+ def enc_dec(self, ops):
+ enc = OpEncoder()
+ dec = OpDecoder()
+ enc.write(*ops)
+ segs = enc.read()
+ dec.write(*segs)
+ dops = dec.read()
+ assert len(ops) == len(dops)
+ for op1, op2 in zip(ops, dops):
+ self.cmp_ops(op1, op2)
+
+ def testEmtpyMT(self):
+ self.enc_dec([MessageTransfer()])
+
+ def testEmptyMTSync(self):
+ self.enc_dec([MessageTransfer(sync=True)])
+
+ def testMT(self):
+ self.enc_dec([MessageTransfer(destination="asdf")])
+
+ def testSyncMT(self):
+ self.enc_dec([MessageTransfer(destination="asdf", sync=True)])
+
+ def testEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(payload="")])
+
+ def testPayloadMT(self):
+ self.enc_dec([MessageTransfer(payload="test payload")])
+
+ def testHeadersEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])])
+
+ def testHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")])
+
+ def testMultiHeadersEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])])
+
+ def testMultiHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")])
+
+ def testContentTypeHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")])
+
+ def testMulti(self):
+ self.enc_dec([MessageTransfer(),
+ MessageTransfer(sync=True),
+ MessageTransfer(destination="one"),
+ MessageTransfer(destination="two", sync=True),
+ MessageTransfer(destination="three", payload="test payload")])
+
+ def testControl(self):
+ self.enc_dec([SessionAttach(name="asdf")])
+
+ def testMixed(self):
+ self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")])
+
+ def testChannel(self):
+ self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)])
+
+ def testCompound(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])])
+
+ def testListCompound(self):
+ self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"),
+ Xid(global_id="two"),
+ Xid(global_id="three")]))])