summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/python/qpid/assembler.py
blob: 92bb0aa0f802b2152d6dbb8dffe670f3c89da721 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from codec010 import StringCodec
from framer import *
from logging import getLogger

log = getLogger("qpid.io.seg")

class Segment:

  def __init__(self, first, last, type, track, channel, payload):
    self.id = None
    self.offset = None
    self.first = first
    self.last = last
    self.type = type
    self.track = track
    self.channel = channel
    self.payload = payload

  def decode(self, spec):
    segs = spec["segment_type"]
    choice = segs.choices[self.type]
    return getattr(self, "decode_%s" % choice.name)(spec)

  def decode_control(self, spec):
    sc = StringCodec(spec, self.payload)
    return sc.read_control()

  def decode_command(self, spec):
    sc = StringCodec(spec, self.payload)
    hdr, cmd = sc.read_command()
    cmd.id = self.id
    return hdr, cmd

  def decode_header(self, spec):
    sc = StringCodec(spec, self.payload)
    values = []
    while len(sc.encoded) > 0:
      values.append(sc.read_struct32())
    return values

  def decode_body(self, spec):
    return self.payload

  def __str__(self):
    return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
                                 self.track, self.channel, self.payload)

  def __repr__(self):
    return str(self)

class Assembler(Framer):

  def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD):
    Framer.__init__(self, sock)
    self.max_payload = max_payload
    self.fragments = {}

  def read_segment(self):
    while True:
      frame = self.read_frame()

      key = (frame.channel, frame.track)
      seg = self.fragments.get(key)
      if seg == None:
        seg = Segment(frame.isFirstSegment(), frame.isLastSegment(),
                      frame.type, frame.track, frame.channel, "")
        self.fragments[key] = seg

      seg.payload += frame.payload

      if frame.isLastFrame():
        self.fragments.pop(key)
        log.debug("RECV %s", seg)
        return seg

  def write_segment(self, segment):
    remaining = segment.payload

    first = True
    while first or remaining:
      payload = remaining[:self.max_payload]
      remaining = remaining[self.max_payload:]

      flags = 0
      if first:
        flags |= FIRST_FRM
        first = False
      if not remaining:
        flags |= LAST_FRM
      if segment.first:
        flags |= FIRST_SEG
      if segment.last:
        flags |= LAST_SEG

      frame = Frame(flags, segment.type, segment.track, segment.channel,
                    payload)
      self.write_frame(frame)

    log.debug("SENT %s", segment)