summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-07-17 12:03:29 +0000
committerRafael H. Schloming <rhs@apache.org>2009-07-17 12:03:29 +0000
commit123e4b8bc5b0e5822bffba177de8773e83ac571b (patch)
treede48246d78bc0087626d87fede52a54aec20bb43
parent5f7116bbba781af8910a2bed9e842aa2839b3d96 (diff)
downloadqpid-python-123e4b8bc5b0e5822bffba177de8773e83ac571b.tar.gz
added non blocking framing code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@795059 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/framing.py142
-rw-r--r--qpid/python/qpid/tests/framing.py173
2 files changed, 315 insertions, 0 deletions
diff --git a/qpid/python/qpid/framing.py b/qpid/python/qpid/framing.py
new file mode 100644
index 0000000000..7c5f68fbcc
--- /dev/null
+++ b/qpid/python/qpid/framing.py
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import struct
+from qpid.framer import Frame, FIRST_SEG, LAST_SEG, FIRST_FRM, LAST_FRM
+from qpid.assembler import Segment
+
+class FrameDecoder:
+
+ def __init__(self):
+ self.input = ""
+ self.output = []
+ self.parse = self.__frame_header
+
+ def write(self, bytes):
+ self.input += bytes
+ while True:
+ next = self.parse()
+ if next is None:
+ break
+ else:
+ self.parse = next
+
+ def __consume(self, n):
+ result = self.input[:n]
+ self.input = self.input[n:]
+ return result
+
+ def __frame_header(self):
+ if len(self.input) >= Frame.HEADER_SIZE:
+ st = self.__consume(Frame.HEADER_SIZE)
+ self.flags, self.type, self.size, self.track, self.channel = \
+ struct.unpack(Frame.HEADER, st)
+ return self.__frame_body
+
+ def __frame_body(self):
+ size = self.size - Frame.HEADER_SIZE
+ if len(self.input) >= size:
+ payload = self.__consume(size)
+ frame = Frame(self.flags, self.type, self.track, self.channel, payload)
+ self.output.append(frame)
+ return self.__frame_header
+
+ def read(self):
+ result = self.output
+ self.output = []
+ return result
+
+class FrameEncoder:
+
+ def __init__(self):
+ self.output = ""
+
+ def write(self, *frames):
+ for frame in frames:
+ size = len(frame.payload) + Frame.HEADER_SIZE
+ track = frame.track & 0x0F
+ self.output += struct.pack(Frame.HEADER, frame.flags, frame.type, size,
+ track, frame.channel)
+ self.output += frame.payload
+
+ def read(self):
+ result = self.output
+ self.output = ""
+ return result
+
+class SegmentDecoder:
+
+ def __init__(self):
+ self.fragments = {}
+ self.segments = []
+
+ def write(self, *frames):
+ for frm in frames:
+ key = (frm.channel, frm.track)
+ seg = self.fragments.get(key)
+
+ if seg == None:
+ seg = Segment(frm.isFirstSegment(), frm.isLastSegment(),
+ frm.type, frm.track, frm.channel, "")
+ self.fragments[key] = seg
+
+ seg.payload += frm.payload
+
+ if frm.isLastFrame():
+ self.fragments.pop(key)
+ self.segments.append(seg)
+
+ def read(self):
+ result = self.segments
+ self.segments = []
+ return result
+
+class SegmentEncoder:
+
+ def __init__(self, max_payload=Frame.MAX_PAYLOAD):
+ self.max_payload = max_payload
+ self.frames = []
+
+ def write(self, *segments):
+ for seg in segments:
+ remaining = seg.payload
+
+ first = True
+ while first or remaining:
+ payload = remaining[:self.max_payload]
+ remaining = remaining[self.max_payload:]
+
+ flags = 0
+ if first:
+ flags |= FIRST_FRM
+ first = False
+ if not remaining:
+ flags |= LAST_FRM
+ if seg.first:
+ flags |= FIRST_SEG
+ if seg.last:
+ flags |= LAST_SEG
+
+ frm = Frame(flags, seg.type, seg.track, seg.channel, payload)
+ self.frames.append(frm)
+
+ def read(self):
+ result = self.frames
+ self.frames = []
+ return result
diff --git a/qpid/python/qpid/tests/framing.py b/qpid/python/qpid/tests/framing.py
new file mode 100644
index 0000000000..4cd596b583
--- /dev/null
+++ b/qpid/python/qpid/tests/framing.py
@@ -0,0 +1,173 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# setup, usage, teardown, errors(sync), errors(async), stress, soak,
+# boundary-conditions, config
+
+from qpid.tests import Test
+from qpid.framing import *
+
+class Base(Test):
+
+ def cmp_frames(self, frm1, frm2):
+ assert frm1.flags == frm2.flags, "expected: %r, got %r" % (frm1, frm2)
+ assert frm1.type == frm2.type, "expected: %r, got %r" % (frm1, frm2)
+ assert frm1.track == frm2.track, "expected: %r, got %r" % (frm1, frm2)
+ assert frm1.channel == frm2.channel, "expected: %r, got %r" % (frm1, frm2)
+ assert frm1.payload == frm2.payload, "expected: %r, got %r" % (frm1, frm2)
+
+ def cmp_segments(self, seg1, seg2):
+ assert seg1.first == seg2.first, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.last == seg2.last, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.type == seg2.type, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.track == seg2.track, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2)
+
+class FrameTest(Base):
+
+ def enc_dec(self, frames, encoded=None):
+ enc = FrameEncoder()
+ dec = FrameDecoder()
+
+ enc.write(*frames)
+ bytes = enc.read()
+ if encoded is not None:
+ assert bytes == encoded, "expected %r, got %r" % (encoded, bytes)
+ dec.write(bytes)
+ dframes = dec.read()
+
+ assert len(frames) == len(dframes)
+ for f, df, in zip(frames, dframes):
+ self.cmp_frames(f, df)
+
+ def testEmpty(self):
+ self.enc_dec([Frame(0, 0, 0, 0, "")],
+ "\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x00")
+
+ def testSingle(self):
+ self.enc_dec([Frame(0, 0, 0, 1, "payload")],
+ "\x00\x00\x00\x13\x00\x00\x00\x01\x00\x00\x00\x00payload")
+
+ def testMaxChannel(self):
+ self.enc_dec([Frame(0, 0, 0, 65535, "max-channel")],
+ "\x00\x00\x00\x17\x00\x00\xff\xff\x00\x00\x00\x00max-channel")
+
+ def testMaxType(self):
+ self.enc_dec([Frame(0, 255, 0, 0, "max-type")],
+ "\x00\xff\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00max-type")
+
+ def testMaxTrack(self):
+ self.enc_dec([Frame(0, 0, 15, 0, "max-track")],
+ "\x00\x00\x00\x15\x00\x0f\x00\x00\x00\x00\x00\x00max-track")
+
+ def testSequence(self):
+ self.enc_dec([Frame(0, 0, 0, 0, "zero"),
+ Frame(0, 0, 0, 1, "one"),
+ Frame(0, 0, 1, 0, "two"),
+ Frame(0, 0, 1, 1, "three"),
+ Frame(0, 1, 0, 0, "four"),
+ Frame(0, 1, 0, 1, "five"),
+ Frame(0, 1, 1, 0, "six"),
+ Frame(0, 1, 1, 1, "seven"),
+ Frame(1, 0, 0, 0, "eight"),
+ Frame(1, 0, 0, 1, "nine"),
+ Frame(1, 0, 1, 0, "ten"),
+ Frame(1, 0, 1, 1, "eleven"),
+ Frame(1, 1, 0, 0, "twelve"),
+ Frame(1, 1, 0, 1, "thirteen"),
+ Frame(1, 1, 1, 0, "fourteen"),
+ Frame(1, 1, 1, 1, "fifteen")])
+
+class SegmentTest(Base):
+
+ def enc_dec(self, segments, frames=None, interleave=None, max_payload=Frame.MAX_PAYLOAD):
+ enc = SegmentEncoder(max_payload)
+ dec = SegmentDecoder()
+
+ enc.write(*segments)
+ frms = enc.read()
+ if frames is not None:
+ assert len(frames) == len(frms), "expected %s, got %s" % (frames, frms)
+ for f1, f2 in zip(frames, frms):
+ self.cmp_frames(f1, f2)
+ if interleave is not None:
+ ilvd = []
+ for f in frms:
+ ilvd.append(f)
+ if interleave:
+ ilvd.append(interleave.pop(0))
+ ilvd.extend(interleave)
+ dec.write(*ilvd)
+ else:
+ dec.write(*frms)
+ segs = dec.read()
+ assert len(segments) == len(segs)
+ for s1, s2 in zip(segments, segs):
+ self.cmp_segments(s1, s2)
+
+ def testEmpty(self):
+ self.enc_dec([Segment(True, True, 0, 0, 0, "")],
+ [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0,
+ "")])
+
+ def testSingle(self):
+ self.enc_dec([Segment(True, True, 0, 0, 0, "payload")],
+ [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0,
+ "payload")])
+
+ def testMaxChannel(self):
+ self.enc_dec([Segment(False, False, 0, 0, 65535, "max-channel")],
+ [Frame(FIRST_FRM | LAST_FRM, 0, 0, 65535, "max-channel")])
+
+ def testMaxType(self):
+ self.enc_dec([Segment(False, False, 255, 0, 0, "max-type")],
+ [Frame(FIRST_FRM | LAST_FRM, 255, 0, 0, "max-type")])
+
+ def testMaxTrack(self):
+ self.enc_dec([Segment(False, False, 0, 15, 0, "max-track")],
+ [Frame(FIRST_FRM | LAST_FRM, 0, 15, 0, "max-track")])
+
+ def testSequence(self):
+ self.enc_dec([Segment(True, False, 0, 0, 0, "one"),
+ Segment(False, False, 0, 0, 0, "two"),
+ Segment(False, True, 0, 0, 0, "three")],
+ [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG, 0, 0, 0, "one"),
+ Frame(FIRST_FRM | LAST_FRM, 0, 0, 0, "two"),
+ Frame(FIRST_FRM | LAST_FRM | LAST_SEG, 0, 0, 0, "three")])
+
+ def testInterleaveChannel(self):
+ frames = [Frame(0, 0, 0, 0, chr(ord("a") + i)) for i in range(7)]
+ frames[0].flags |= FIRST_FRM
+ frames[-1].flags |= LAST_FRM
+
+ ilvd = [Frame(0, 0, 0, 1, chr(ord("a") + i)) for i in range(7)]
+
+ self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefg")], frames, ilvd, max_payload=1)
+
+ def testInterleaveTrack(self):
+ frames = [Frame(0, 0, 0, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1))
+ for i in range(0, 8, 2)]
+ frames[0].flags |= FIRST_FRM
+ frames[-1].flags |= LAST_FRM
+
+ ilvd = [Frame(0, 0, 1, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1))
+ for i in range(0, 8, 2)]
+
+ self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2)