diff options
Diffstat (limited to 'qpid/python/qpid/connection08.py')
-rw-r--r-- | qpid/python/qpid/connection08.py | 505 |
1 files changed, 505 insertions, 0 deletions
diff --git a/qpid/python/qpid/connection08.py b/qpid/python/qpid/connection08.py new file mode 100644 index 0000000000..654148dad2 --- /dev/null +++ b/qpid/python/qpid/connection08.py @@ -0,0 +1,505 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A Connection class containing socket code that uses the spec metadata +to read and write Frame objects. This could be used by a client, +server, or even a proxy implementation. +""" + +import socket, codec, logging, qpid +from cStringIO import StringIO +from codec import EOF +from compat import SHUT_RDWR +from exceptions import VersionError + +class SockIO: + + def __init__(self, sock): + self.sock = sock + + def write(self, buf): +# print "OUT: %r" % buf + self.sock.sendall(buf) + + def read(self, n): + data = "" + while len(data) < n: + try: + s = self.sock.recv(n - len(data)) + except socket.error: + break + if len(s) == 0: + break +# print "IN: %r" % s + data += s + return data + + def flush(self): + pass + + def close(self): + self.sock.shutdown(SHUT_RDWR) + self.sock.close() + +def connect(host, port): + sock = socket.socket() + sock.connect((host, port)) + sock.setblocking(1) + return SockIO(sock) + +def listen(host, port, predicate = lambda: True): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.listen(5) + while predicate(): + s, a = sock.accept() + yield SockIO(s) + +class FramingError(Exception): + pass + +class Connection: + + def __init__(self, io, spec): + self.codec = codec.Codec(io, spec) + self.spec = spec + self.FRAME_END = self.spec.constants.byname["frame_end"].id + self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor)) + self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor)) + + def flush(self): + self.codec.flush() + + INIT="!4s4B" + + def init(self): + self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, + self.spec.minor) + + def tini(self): + self.codec.unpack(Connection.INIT) + + def write_8_0(self, frame): + c = self.codec + c.encode_octet(self.spec.constants.byname[frame.type].id) + c.encode_short(frame.channel) + body = StringIO() + enc = codec.Codec(body, self.spec) + frame.encode(enc) + enc.flush() + c.encode_longstr(body.getvalue()) + c.encode_octet(self.FRAME_END) + + def read_8_0(self): + c = self.codec + tid = c.decode_octet() + try: + type = self.spec.constants.byid[tid].name + except KeyError: + if tid == ord('A') and c.unpack("!3s") == "MQP": + _, _, major, minor = c.unpack("4B") + raise VersionError("client: %s-%s, server: %s-%s" % + (self.spec.major, self.spec.minor, major, minor)) + else: + raise FramingError("unknown frame type: %s" % tid) + channel = c.decode_short() + body = c.decode_longstr() + dec = codec.Codec(StringIO(body), self.spec) + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + frame.channel = channel + end = c.decode_octet() + if end != self.FRAME_END: + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) + return frame + + def write_0_9(self, frame): + self.write_8_0(frame) + + def read_0_9(self): + return self.read_8_0() + + def write_0_10(self, frame): + c = self.codec + flags = 0 + if frame.bof: flags |= 0x08 + if frame.eof: flags |= 0x04 + if frame.bos: flags |= 0x02 + if frame.eos: flags |= 0x01 + + c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1 + c.encode_octet(self.spec.constants.byname[frame.type].id) + body = StringIO() + enc = codec.Codec(body, self.spec) + frame.encode(enc) + enc.flush() + frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size) + c.encode_short(frame_size) + c.encode_octet(0) # Reserved + c.encode_octet(frame.subchannel & 0x0f) + c.encode_short(frame.channel) + c.encode_long(0) # Reserved + c.write(body.getvalue()) + c.encode_octet(self.FRAME_END) + + def read_0_10(self): + c = self.codec + flags = c.decode_octet() # TODO: currently ignoring flags + framing_version = (flags & 0xc0) >> 6 + if framing_version != 0: + raise "frame error: unknown framing version" + type = self.spec.constants.byid[c.decode_octet()].name + frame_size = c.decode_short() + if frame_size < 12: # TODO: Magic number (frame header size) + raise "frame error: frame size too small" + reserved1 = c.decode_octet() + field = c.decode_octet() + subchannel = field & 0x0f + channel = c.decode_short() + reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0 + if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0: + raise "frame error: reserved bits not all zero" + body_size = frame_size - 12 # TODO: Magic number (frame header size) + body = c.read(body_size) + dec = codec.Codec(StringIO(body), self.spec) + try: + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + except EOF: + raise "truncated frame body: %r" % body + frame.channel = channel + frame.subchannel = subchannel + end = c.decode_octet() + if end != self.FRAME_END: + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) + return frame + + def write_99_0(self, frame): + self.write_0_10(frame) + + def read_99_0(self): + return self.read_0_10() + +class Frame: + + DECODERS = {} + + class __metaclass__(type): + + def __new__(cls, name, bases, dict): + for attr in ("encode", "decode", "type"): + if not dict.has_key(attr): + raise TypeError("%s must define %s" % (name, attr)) + dict["decode"] = staticmethod(dict["decode"]) + if dict.has_key("__init__"): + __init__ = dict["__init__"] + def init(self, *args, **kwargs): + args = list(args) + self.init(args, kwargs) + __init__(self, *args, **kwargs) + dict["__init__"] = init + t = type.__new__(cls, name, bases, dict) + if t.type != None: + Frame.DECODERS[t.type] = t + return t + + type = None + + def init(self, args, kwargs): + self.channel = kwargs.pop("channel", 0) + self.subchannel = kwargs.pop("subchannel", 0) + self.bos = True + self.eos = True + self.bof = True + self.eof = True + + def encode(self, enc): abstract + + def decode(spec, dec, size): abstract + +class Method(Frame): + + type = "frame_method" + + def __init__(self, method, args): + if len(args) != len(method.fields): + argspec = ["%s: %s" % (f.name, f.type) + for f in method.fields] + raise TypeError("%s.%s expecting (%s), got %s" % + (method.klass.name, method.name, ", ".join(argspec), + args)) + self.method = method + self.method_type = method + self.args = args + self.eof = not method.content + + def encode(self, c): + version = (c.spec.major, c.spec.minor) + if version == (0, 10) or version == (99, 0): + c.encode_octet(self.method.klass.id) + c.encode_octet(self.method.id) + else: + c.encode_short(self.method.klass.id) + c.encode_short(self.method.id) + for field, arg in zip(self.method.fields, self.args): + c.encode(field.type, arg) + + def decode(spec, c, size): + version = (c.spec.major, c.spec.minor) + if version == (0, 10) or version == (99, 0): + klass = spec.classes.byid[c.decode_octet()] + meth = klass.methods.byid[c.decode_octet()] + else: + klass = spec.classes.byid[c.decode_short()] + meth = klass.methods.byid[c.decode_short()] + args = tuple([c.decode(f.type) for f in meth.fields]) + return Method(meth, args) + + def __str__(self): + return "[%s] %s %s" % (self.channel, self.method, + ", ".join([str(a) for a in self.args])) + +class Request(Frame): + + type = "frame_request" + + def __init__(self, id, response_mark, method): + self.id = id + self.response_mark = response_mark + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.response_mark) + # reserved + enc.encode_long(0) + self.method.encode(enc) + + def decode(spec, dec, size): + id = dec.decode_longlong() + mark = dec.decode_longlong() + # reserved + dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Request(id, mark, method) + + def __str__(self): + return "[%s] Request(%s) %s" % (self.channel, self.id, self.method) + +class Response(Frame): + + type = "frame_response" + + def __init__(self, id, request_id, batch_offset, method): + self.id = id + self.request_id = request_id + self.batch_offset = batch_offset + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.request_id) + enc.encode_long(self.batch_offset) + self.method.encode(enc) + + def decode(spec, dec, size): + id = dec.decode_longlong() + request_id = dec.decode_longlong() + batch_offset = dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Response(id, request_id, batch_offset, method) + + def __str__(self): + return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method) + +def uses_struct_encoding(spec): + return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0) + +class Header(Frame): + + type = "frame_header" + + def __init__(self, klass, weight, size, properties): + self.klass = klass + self.weight = weight + self.size = size + self.properties = properties + self.eof = size == 0 + self.bof = False + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] + + def encode(self, c): + if uses_struct_encoding(c.spec): + self.encode_structs(c) + else: + self.encode_legacy(c) + + def encode_structs(self, c): + # XXX + structs = [qpid.Struct(c.spec.domains.byname["delivery_properties"].type), + qpid.Struct(c.spec.domains.byname["message_properties"].type)] + + # XXX + props = self.properties.copy() + for k in self.properties: + for s in structs: + if s.exists(k): + s.set(k, props.pop(k)) + if props: + raise TypeError("no such property: %s" % (", ".join(props))) + + # message properties store the content-length now, and weight is + # deprecated + if self.size != None: + structs[1].content_length = self.size + + for s in structs: + c.encode_long_struct(s) + + def encode_legacy(self, c): + c.encode_short(self.klass.id) + c.encode_short(self.weight) + c.encode_longlong(self.size) + + # property flags + nprops = len(self.klass.fields) + flags = 0 + for i in range(nprops): + f = self.klass.fields.items[i] + flags <<= 1 + if self.properties.get(f.name) != None: + flags |= 1 + # the last bit indicates more flags + if i > 0 and (i % 15) == 0: + flags <<= 1 + if nprops > (i + 1): + flags |= 1 + c.encode_short(flags) + flags = 0 + flags <<= ((16 - (nprops % 15)) % 16) + c.encode_short(flags) + + # properties + for f in self.klass.fields: + v = self.properties.get(f.name) + if v != None: + c.encode(f.type, v) + + def decode(spec, c, size): + if uses_struct_encoding(spec): + return Header.decode_structs(spec, c, size) + else: + return Header.decode_legacy(spec, c, size) + + def decode_structs(spec, c, size): + structs = [] + start = c.nread + while c.nread - start < size: + structs.append(c.decode_long_struct()) + + # XXX + props = {} + length = None + for s in structs: + for f in s.type.fields: + if s.has(f.name): + props[f.name] = s.get(f.name) + if f.name == "content_length": + length = s.get(f.name) + return Header(None, 0, length, props) + + decode_structs = staticmethod(decode_structs) + + def decode_legacy(spec, c, size): + klass = spec.classes.byid[c.decode_short()] + weight = c.decode_short() + size = c.decode_longlong() + + # property flags + bits = [] + while True: + flags = c.decode_short() + for i in range(15, 0, -1): + if flags >> i & 0x1 != 0: + bits.append(True) + else: + bits.append(False) + if flags & 0x1 == 0: + break + + # properties + properties = {} + for b, f in zip(bits, klass.fields): + if b: + # Note: decode returns a unicode u'' string but only + # plain '' strings can be used as keywords so we need to + # stringify the names. + properties[str(f.name)] = c.decode(f.type) + return Header(klass, weight, size, properties) + + decode_legacy = staticmethod(decode_legacy) + + def __str__(self): + return "%s %s %s %s" % (self.klass, self.weight, self.size, + self.properties) + +class Body(Frame): + + type = "frame_body" + + def __init__(self, content): + self.content = content + self.eof = True + self.bof = False + + def encode(self, enc): + enc.write(self.content) + + def decode(spec, dec, size): + return Body(dec.read(size)) + + def __str__(self): + return "Body(%r)" % self.content + +# TODO: +# OOB_METHOD = "frame_oob_method" +# OOB_HEADER = "frame_oob_header" +# OOB_BODY = "frame_oob_body" +# TRACE = "frame_trace" +# HEARTBEAT = "frame_heartbeat" |