summaryrefslogtreecommitdiff
path: root/python/qpid/framing.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-08-11 15:40:19 +0000
committerRafael H. Schloming <rhs@apache.org>2009-08-11 15:40:19 +0000
commitd22ac4bbbd52fc8cbf80f864c49c904b0b24a529 (patch)
tree1f140400747e507d88d69695046e13a5efcf0e52 /python/qpid/framing.py
parent0fc88ad654ed1dabf14c489ed5920b440a7fc6a2 (diff)
downloadqpid-python-d22ac4bbbd52fc8cbf80f864c49c904b0b24a529.tar.gz
- removed old and redundent tests
- removed old test harness in favor of qpid-python-test - modified qpid-python-test to support "skipped" tests, these are tests that failed due to an anticipated environmental reason such as the broker is not running or it is the wrong version - modified the qpid-python-test harness to exit with appropriate error codes based on the test results - modified the python clients to report version mismatches rather than framing errors - made qpid_config provide variables for 0-8, 0-9, and 0-10 versions of the spec - modified the 0-10 client to directly codegen classes - added new 0-10 framing layer based on push parsing rather than pull parsing - added numerous framing tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@803168 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/framing.py')
-rw-r--r--python/qpid/framing.py172
1 files changed, 170 insertions, 2 deletions
diff --git a/python/qpid/framing.py b/python/qpid/framing.py
index 7c5f68fbcc..0a8f26272c 100644
--- a/python/qpid/framing.py
+++ b/python/qpid/framing.py
@@ -18,8 +18,64 @@
#
import struct
-from qpid.framer import Frame, FIRST_SEG, LAST_SEG, FIRST_FRM, LAST_FRM
-from qpid.assembler import Segment
+
+FIRST_SEG = 0x08
+LAST_SEG = 0x04
+FIRST_FRM = 0x02
+LAST_FRM = 0x01
+
+class Frame:
+
+ HEADER = "!2BHxBH4x"
+ HEADER_SIZE = struct.calcsize(HEADER)
+ MAX_PAYLOAD = 65535 - struct.calcsize(HEADER)
+
+ def __init__(self, flags, type, track, channel, payload):
+ if len(payload) > Frame.MAX_PAYLOAD:
+ raise ValueError("max payload size exceeded: %s" % len(payload))
+ self.flags = flags
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def isFirstSegment(self):
+ return bool(FIRST_SEG & self.flags)
+
+ def isLastSegment(self):
+ return bool(LAST_SEG & self.flags)
+
+ def isFirstFrame(self):
+ return bool(FIRST_FRM & self.flags)
+
+ def isLastFrame(self):
+ return bool(LAST_FRM & self.flags)
+
+ def __repr__(self):
+ return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()),
+ int(self.isLastSegment()),
+ int(self.isFirstFrame()),
+ int(self.isLastFrame()),
+ self.type,
+ self.track,
+ self.channel,
+ self.payload)
+
+class Segment:
+
+ def __init__(self, first, last, type, track, channel, payload):
+ self.id = None
+ self.offset = None
+ self.first = first
+ self.last = last
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def __repr__(self):
+ return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
+ self.track, self.channel, self.payload)
class FrameDecoder:
@@ -140,3 +196,115 @@ class SegmentEncoder:
result = self.frames
self.frames = []
return result
+
+from ops import COMMANDS, CONTROLS, COMPOUND, Header, segment_type, track
+from spec import SPEC
+
+from codec010 import StringCodec
+
+class OpEncoder:
+
+ def __init__(self):
+ self.segments = []
+
+ def write(self, *ops):
+ for op in ops:
+ if COMMANDS.has_key(op.NAME):
+ seg_type = segment_type.command
+ seg_track = track.command
+ enc = self.encode_command(op)
+ elif CONTROLS.has_key(op.NAME):
+ seg_type = segment_type.control
+ seg_track = track.control
+ enc = self.encode_compound(op)
+ else:
+ raise ValueError(op)
+ seg = Segment(True, False, seg_type, seg_track, op.channel, enc)
+ self.segments.append(seg)
+ if hasattr(op, "headers") and op.headers is not None:
+ hdrs = ""
+ for h in op.headers:
+ hdrs += self.encode_compound(h)
+ seg = Segment(False, False, segment_type.header, seg_track, op.channel,
+ hdrs)
+ self.segments.append(seg)
+ if hasattr(op, "payload") and op.payload is not None:
+ self.segments.append(Segment(False, False, segment_type.body, seg_track,
+ op.channel, op.payload))
+ self.segments[-1].last = True
+
+ def encode_command(self, cmd):
+ sc = StringCodec()
+ sc.write_uint16(cmd.CODE)
+ sc.write_compound(Header(sync=cmd.sync))
+ sc.write_fields(cmd)
+ return sc.encoded
+
+ def encode_compound(self, op):
+ sc = StringCodec()
+ sc.write_compound(op)
+ return sc.encoded
+
+ def read(self):
+ result = self.segments
+ self.segments = []
+ return result
+
+class OpDecoder:
+
+ def __init__(self):
+ self.op = None
+ self.ops = []
+
+ def write(self, *segments):
+ for seg in segments:
+ if seg.first:
+ if seg.type == segment_type.command:
+ self.op = self.decode_command(seg.payload)
+ elif seg.type == segment_type.control:
+ self.op = self.decode_control(seg.payload)
+ else:
+ raise ValueError(seg)
+ self.op.channel = seg.channel
+ elif seg.type == segment_type.header:
+ if self.op.headers is None:
+ self.op.headers = []
+ self.op.headers.extend(self.decode_headers(seg.payload))
+ elif seg.type == segment_type.body:
+ if self.op.payload is None:
+ self.op.payload = seg.payload
+ else:
+ self.op.payload += seg.payload
+ if seg.last:
+ self.ops.append(self.op)
+ self.op = None
+
+ def decode_command(self, encoded):
+ sc = StringCodec(encoded)
+ code = sc.read_uint16()
+ cls = COMMANDS[code]
+ hdr = sc.read_compound(Header)
+ cmd = cls()
+ sc.read_fields(cmd)
+ cmd.sync = hdr.sync
+ return cmd
+
+ def decode_control(self, encoded):
+ sc = StringCodec(encoded)
+ code = sc.read_uint16()
+ cls = CONTROLS[code]
+ ctl = cls()
+ sc.read_fields(ctl)
+ return ctl
+
+ def decode_headers(self, encoded):
+ sc = StringCodec(encoded)
+ result = []
+ while sc.encoded:
+ result.append(sc.read_struct32())
+ return result
+
+ def read(self):
+ result = self.ops
+ self.ops = []
+ return result