diff options
Diffstat (limited to 'python/qpid/framing.py')
-rw-r--r-- | python/qpid/framing.py | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/python/qpid/framing.py b/python/qpid/framing.py new file mode 100644 index 0000000000..7c5f68fbcc --- /dev/null +++ b/python/qpid/framing.py @@ -0,0 +1,142 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import struct +from qpid.framer import Frame, FIRST_SEG, LAST_SEG, FIRST_FRM, LAST_FRM +from qpid.assembler import Segment + +class FrameDecoder: + + def __init__(self): + self.input = "" + self.output = [] + self.parse = self.__frame_header + + def write(self, bytes): + self.input += bytes + while True: + next = self.parse() + if next is None: + break + else: + self.parse = next + + def __consume(self, n): + result = self.input[:n] + self.input = self.input[n:] + return result + + def __frame_header(self): + if len(self.input) >= Frame.HEADER_SIZE: + st = self.__consume(Frame.HEADER_SIZE) + self.flags, self.type, self.size, self.track, self.channel = \ + struct.unpack(Frame.HEADER, st) + return self.__frame_body + + def __frame_body(self): + size = self.size - Frame.HEADER_SIZE + if len(self.input) >= size: + payload = self.__consume(size) + frame = Frame(self.flags, self.type, self.track, self.channel, payload) + self.output.append(frame) + return self.__frame_header + + def read(self): + result = self.output + self.output = [] + return result + +class FrameEncoder: + + def __init__(self): + self.output = "" + + def write(self, *frames): + for frame in frames: + size = len(frame.payload) + Frame.HEADER_SIZE + track = frame.track & 0x0F + self.output += struct.pack(Frame.HEADER, frame.flags, frame.type, size, + track, frame.channel) + self.output += frame.payload + + def read(self): + result = self.output + self.output = "" + return result + +class SegmentDecoder: + + def __init__(self): + self.fragments = {} + self.segments = [] + + def write(self, *frames): + for frm in frames: + key = (frm.channel, frm.track) + seg = self.fragments.get(key) + + if seg == None: + seg = Segment(frm.isFirstSegment(), frm.isLastSegment(), + frm.type, frm.track, frm.channel, "") + self.fragments[key] = seg + + seg.payload += frm.payload + + if frm.isLastFrame(): + self.fragments.pop(key) + self.segments.append(seg) + + def read(self): + result = self.segments + self.segments = [] + return result + +class SegmentEncoder: + + def __init__(self, max_payload=Frame.MAX_PAYLOAD): + self.max_payload = max_payload + self.frames = [] + + def write(self, *segments): + for seg in segments: + remaining = seg.payload + + first = True + while first or remaining: + payload = remaining[:self.max_payload] + remaining = remaining[self.max_payload:] + + flags = 0 + if first: + flags |= FIRST_FRM + first = False + if not remaining: + flags |= LAST_FRM + if seg.first: + flags |= FIRST_SEG + if seg.last: + flags |= LAST_SEG + + frm = Frame(flags, seg.type, seg.track, seg.channel, payload) + self.frames.append(frm) + + def read(self): + result = self.frames + self.frames = [] + return result |