diff options
Diffstat (limited to 'qpid/python/qpid/codec.py')
-rw-r--r-- | qpid/python/qpid/codec.py | 701 |
1 files changed, 701 insertions, 0 deletions
diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py new file mode 100644 index 0000000000..a4c542415c --- /dev/null +++ b/qpid/python/qpid/codec.py @@ -0,0 +1,701 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Utility code to translate between python objects and AMQP encoded data +fields. + +The unit test for this module is located in tests/codec.py +""" + +import re, qpid, spec08, os +from cStringIO import StringIO +from struct import * +from reference import ReferenceId +from logging import getLogger + +log = getLogger("qpid.codec") + +class EOF(Exception): + pass + +# This code appears to be dead +TYPE_ALIASES = { + "long_string": "longstr", + "unsigned_int": "long" + } + +class Codec: + + """ + class that handles encoding/decoding of AMQP primitives + """ + + def __init__(self, stream, spec): + """ + initializing the stream/fields used + """ + self.stream = stream + self.spec = spec + self.nwrote = 0 + self.nread = 0 + self.incoming_bits = [] + self.outgoing_bits = [] + + # Before 0-91, the AMQP's set of types did not include the boolean type. However, + # the 0-8 and 0-9 Java client uses this type so we encode/decode it too. However, this + # can be turned off by setting the followng environment value. + if "QPID_CODEC_DISABLE_0_91_BOOLEAN" in os.environ: + self.understand_boolean = False + else: + self.understand_boolean = True + + log.debug("AMQP 0-91 boolean supported : %r", self.understand_boolean) + + self.types = {} + self.codes = {} + self.integertypes = [int, long] + self.encodings = { + float: "double", # python uses 64bit floats, send them as doubles + basestring: "longstr", + None.__class__:"void", + list: "sequence", + tuple: "sequence", + dict: "table" + } + + if self.understand_boolean: + self.encodings[bool] = "boolean" + + for constant in self.spec.constants: + # This code appears to be dead + if constant.klass == "field-table-type": + type = constant.name.replace("field_table_", "") + self.typecode(constant.id, TYPE_ALIASES.get(type, type)) + + if not self.types: + # long-string 'S' + self.typecode(ord('S'), "longstr") + # void 'V' + self.typecode(ord('V'), "void") + # long-int 'I' (32bit signed) + self.typecode(ord('I'), "signed_int") + # long-long-int 'l' (64bit signed) + # This is a long standing pre-0-91-spec type used by the Java + # client, 0-9-1 says it should be unsigned or use 'L') + self.typecode(ord('l'), "signed_long") + # double 'd' + self.typecode(ord('d'), "double") + # float 'f' + self.typecode(ord('f'), "float") + + if self.understand_boolean: + self.typecode(ord('t'), "boolean") + + ## The following are supported for decoding only ## + + # short-short-uint 'b' (8bit signed) + self.types[ord('b')] = "signed_octet" + # short-int 's' (16bit signed) + # This is a long standing pre-0-91-spec type code used by the Java + # client to send shorts, it should really be a short-string, or for 0-9-1 use 'U' + self.types[ord('s')] = "signed_short" + + def typecode(self, code, type): + self.types[code] = type + self.codes[type] = code + + def resolve(self, klass, value): + if(klass in self.integertypes): + if (value >= -2147483648 and value <= 2147483647): + return "signed_int" + elif (value >= -9223372036854775808 and value <= 9223372036854775807): + return "signed_long" + else: + raise ValueError('Integer value is outwith the supported 64bit signed range') + if self.encodings.has_key(klass): + return self.encodings[klass] + for base in klass.__bases__: + result = self.resolve(base, value) + if result != None: + return result + + def read(self, n): + """ + reads in 'n' bytes from the stream. Can raise EOF exception + """ + self.clearbits() + data = self.stream.read(n) + if n > 0 and len(data) == 0: + raise EOF() + self.nread += len(data) + return data + + def write(self, s): + """ + writes data 's' to the stream + """ + self.flushbits() + self.stream.write(s) + self.nwrote += len(s) + + def flush(self): + """ + flushes the bits and data present in the stream + """ + self.flushbits() + self.stream.flush() + + def flushbits(self): + """ + flushes the bits(compressed into octets) onto the stream + """ + if len(self.outgoing_bits) > 0: + bytes = [] + index = 0 + for b in self.outgoing_bits: + if index == 0: bytes.append(0) + if b: bytes[-1] |= 1 << index + index = (index + 1) % 8 + del self.outgoing_bits[:] + for byte in bytes: + self.encode_octet(byte) + + def clearbits(self): + if self.incoming_bits: + self.incoming_bits = [] + + def pack(self, fmt, *args): + """ + packs the data 'args' as per the format 'fmt' and writes it to the stream + """ + self.write(pack(fmt, *args)) + + def unpack(self, fmt): + """ + reads data from the stream and unpacks it as per the format 'fmt' + """ + size = calcsize(fmt) + data = self.read(size) + values = unpack(fmt, data) + if len(values) == 1: + return values[0] + else: + return values + + def encode(self, type, value): + """ + calls the appropriate encode function e.g. encode_octet, encode_short etc. + """ + if isinstance(type, spec08.Struct): + self.encode_struct(type, value) + else: + getattr(self, "encode_" + type)(value) + + def decode(self, type): + """ + calls the appropriate decode function e.g. decode_octet, decode_short etc. + """ + if isinstance(type, spec08.Struct): + return self.decode_struct(type) + else: + log.debug("Decoding using method: decode_" + type) + return getattr(self, "decode_" + type)() + + def encode_bit(self, o): + """ + encodes a bit + """ + if o: + self.outgoing_bits.append(True) + else: + self.outgoing_bits.append(False) + + def decode_bit(self): + """ + decodes a bit + """ + if len(self.incoming_bits) == 0: + bits = self.decode_octet() + for i in range(8): + self.incoming_bits.append(bits >> i & 1 != 0) + return self.incoming_bits.pop(0) + + def encode_octet(self, o): + """ + encodes an UNSIGNED octet (8 bits) data 'o' in network byte order + """ + + # octet's valid range is [0,255] + if (o < 0 or o > 255): + raise ValueError('Valid range of octet is [0,255]') + + self.pack("!B", int(o)) + + def decode_octet(self): + """ + decodes an UNSIGNED octet (8 bits) encoded in network byte order + """ + return self.unpack("!B") + + def decode_signed_octet(self): + """ + decodes a signed octet (8 bits) encoded in network byte order + """ + return self.unpack("!b") + + def encode_short(self, o): + """ + encodes an UNSIGNED short (16 bits) data 'o' in network byte order + AMQP 0-9-1 type: short-uint + """ + + # short int's valid range is [0,65535] + if (o < 0 or o > 65535): + raise ValueError('Valid range of short int is [0,65535]: %s' % o) + + self.pack("!H", int(o)) + + def decode_short(self): + """ + decodes an UNSIGNED short (16 bits) in network byte order + AMQP 0-9-1 type: short-uint + """ + return self.unpack("!H") + + def decode_signed_short(self): + """ + decodes a signed short (16 bits) in network byte order + AMQP 0-9-1 type: short-int + """ + return self.unpack("!h") + + def encode_long(self, o): + """ + encodes an UNSIGNED long (32 bits) data 'o' in network byte order + AMQP 0-9-1 type: long-uint + """ + + # we need to check both bounds because on 64 bit platforms + # struct.pack won't raise an error if o is too large + if (o < 0 or o > 4294967295): + raise ValueError('Valid range of long int is [0,4294967295]') + + self.pack("!L", int(o)) + + def decode_long(self): + """ + decodes an UNSIGNED long (32 bits) in network byte order + AMQP 0-9-1 type: long-uint + """ + return self.unpack("!L") + + def encode_signed_long(self, o): + """ + encodes a signed long (64 bits) in network byte order + AMQP 0-9-1 type: long-long-int + """ + self.pack("!q", o) + + def decode_signed_long(self): + """ + decodes a signed long (64 bits) in network byte order + AMQP 0-9-1 type: long-long-int + """ + return self.unpack("!q") + + def encode_signed_int(self, o): + """ + encodes a signed int (32 bits) in network byte order + AMQP 0-9-1 type: long-int + """ + self.pack("!l", o) + + def decode_signed_int(self): + """ + decodes a signed int (32 bits) in network byte order + AMQP 0-9-1 type: long-int + """ + return self.unpack("!l") + + def encode_longlong(self, o): + """ + encodes an UNSIGNED long long (64 bits) data 'o' in network byte order + AMQP 0-9-1 type: long-long-uint + """ + self.pack("!Q", o) + + def decode_longlong(self): + """ + decodes an UNSIGNED long long (64 bits) in network byte order + AMQP 0-9-1 type: long-long-uint + """ + return self.unpack("!Q") + + def encode_float(self, o): + self.pack("!f", o) + + def decode_float(self): + return self.unpack("!f") + + def encode_double(self, o): + self.pack("!d", o) + + def decode_double(self): + return self.unpack("!d") + + def encode_bin128(self, b): + for idx in range (0,16): + self.pack("!B", ord (b[idx])) + + def decode_bin128(self): + result = "" + for idx in range (0,16): + result = result + chr (self.unpack("!B")) + return result + + def encode_raw(self, len, b): + for idx in range (0,len): + self.pack("!B", b[idx]) + + def decode_raw(self, len): + result = "" + for idx in range (0,len): + result = result + chr (self.unpack("!B")) + return result + + def enc_str(self, fmt, s): + """ + encodes a string 's' in network byte order as per format 'fmt' + """ + size = len(s) + self.pack(fmt, size) + self.write(s) + + def dec_str(self, fmt): + """ + decodes a string in network byte order as per format 'fmt' + """ + size = self.unpack(fmt) + return self.read(size) + + def encode_shortstr(self, s): + """ + encodes a short string 's' in network byte order + """ + + # short strings are limited to 255 octets + if len(s) > 255: + raise ValueError('Short strings are limited to 255 octets') + + self.enc_str("!B", s) + + def decode_shortstr(self): + """ + decodes a short string in network byte order + """ + return self.dec_str("!B") + + def encode_longstr(self, s): + """ + encodes a long string 's' in network byte order + """ + if isinstance(s, dict): + self.encode_table(s) + else: + self.enc_str("!L", s) + + def decode_longstr(self): + """ + decodes a long string 's' in network byte order + """ + return self.dec_str("!L") + + def encode_table(self, tbl): + """ + encodes a table data structure in network byte order + """ + enc = StringIO() + codec = Codec(enc, self.spec) + if tbl: + for key, value in tbl.items(): + if self.spec.major == 8 and self.spec.minor == 0 and len(key) > 128: + raise ValueError("field table key too long: '%s'" % key) + type = self.resolve(value.__class__, value) + if type == None: + raise ValueError("no encoding for: " + str(value.__class__)) + codec.encode_shortstr(key) + codec.encode_octet(self.codes[type]) + codec.encode(type, value) + s = enc.getvalue() + self.encode_long(len(s)) + self.write(s) + + def decode_table(self): + """ + decodes a table data structure in network byte order + """ + size = self.decode_long() + start = self.nread + result = {} + while self.nread - start < size: + key = self.decode_shortstr() + log.debug("Field table entry key: %r", key) + code = self.decode_octet() + log.debug("Field table entry type code: %r", code) + if self.types.has_key(code): + value = self.decode(self.types[code]) + else: + w = width(code) + if fixed(code): + value = self.read(w) + else: + value = self.read(self.dec_num(w)) + result[key] = value + log.debug("Field table entry value: %r", value) + return result + + def encode_timestamp(self, t): + """ + encodes a timestamp data structure in network byte order + """ + self.encode_longlong(t) + + def decode_timestamp(self): + """ + decodes a timestamp data structure in network byte order + """ + return self.decode_longlong() + + def encode_content(self, s): + """ + encodes a content data structure in network byte order + + content can be passed as a string in which case it is assumed to + be inline data, or as an instance of ReferenceId indicating it is + a reference id + """ + if isinstance(s, ReferenceId): + self.encode_octet(1) + self.encode_longstr(s.id) + else: + self.encode_octet(0) + self.encode_longstr(s) + + def decode_content(self): + """ + decodes a content data structure in network byte order + + return a string for inline data and a ReferenceId instance for + references + """ + type = self.decode_octet() + if type == 0: + return self.decode_longstr() + else: + return ReferenceId(self.decode_longstr()) + + # new domains for 0-10: + + def encode_rfc1982_long(self, s): + self.encode_long(s) + + def decode_rfc1982_long(self): + return self.decode_long() + + def encode_rfc1982_long_set(self, s): + self.encode_short(len(s) * 4) + for i in s: + self.encode_long(i) + + def decode_rfc1982_long_set(self): + count = self.decode_short() / 4 + set = [] + for i in range(0, count): + set.append(self.decode_long()) + return set; + + def encode_uuid(self, s): + self.pack("16s", s) + + def decode_uuid(self): + return self.unpack("16s") + + def encode_void(self,o): + #NO-OP, value is implicit in the type. + return + + def decode_void(self): + return None + + def enc_num(self, width, n): + if width == 1: + self.encode_octet(n) + elif width == 2: + self.encode_short(n) + elif width == 3: + self.encode_long(n) + else: + raise ValueError("invalid width: %s" % width) + + def dec_num(self, width): + if width == 1: + return self.decode_octet() + elif width == 2: + return self.decode_short() + elif width == 4: + return self.decode_long() + else: + raise ValueError("invalid width: %s" % width) + + def encode_struct(self, type, s): + if type.size: + enc = StringIO() + codec = Codec(enc, self.spec) + codec.encode_struct_body(type, s) + codec.flush() + body = enc.getvalue() + self.enc_num(type.size, len(body)) + self.write(body) + else: + self.encode_struct_body(type, s) + + def decode_struct(self, type): + if type.size: + size = self.dec_num(type.size) + if size == 0: + return None + return self.decode_struct_body(type) + + def encode_struct_body(self, type, s): + reserved = 8*type.pack - len(type.fields) + assert reserved >= 0 + + for f in type.fields: + if s == None: + self.encode_bit(False) + elif f.type == "bit": + self.encode_bit(s.get(f.name)) + else: + self.encode_bit(s.has(f.name)) + + for i in range(reserved): + self.encode_bit(False) + + for f in type.fields: + if f.type != "bit" and s != None and s.has(f.name): + self.encode(f.type, s.get(f.name)) + + self.flush() + + def decode_struct_body(self, type): + reserved = 8*type.pack - len(type.fields) + assert reserved >= 0 + + s = qpid.Struct(type) + + for f in type.fields: + if f.type == "bit": + s.set(f.name, self.decode_bit()) + elif self.decode_bit(): + s.set(f.name, None) + + for i in range(reserved): + if self.decode_bit(): + raise ValueError("expecting reserved flag") + + for f in type.fields: + if f.type != "bit" and s.has(f.name): + s.set(f.name, self.decode(f.type)) + + self.clearbits() + + return s + + def encode_long_struct(self, s): + enc = StringIO() + codec = Codec(enc, self.spec) + type = s.type + codec.encode_short(type.type) + codec.encode_struct_body(type, s) + self.encode_longstr(enc.getvalue()) + + def decode_long_struct(self): + codec = Codec(StringIO(self.decode_longstr()), self.spec) + type = self.spec.structs[codec.decode_short()] + return codec.decode_struct_body(type) + + def decode_array(self): + size = self.decode_long() + code = self.decode_octet() + count = self.decode_long() + result = [] + for i in range(0, count): + if self.types.has_key(code): + value = self.decode(self.types[code]) + else: + w = width(code) + if fixed(code): + value = self.read(w) + else: + value = self.read(self.dec_num(w)) + result.append(value) + return result + + def encode_boolean(self, s): + if (s): + self.pack("!c", "\x01") + else: + self.pack("!c", "\x00") + + def decode_boolean(self): + b = self.unpack("!c") + if b == "\x00": + return False + else: + # AMQP spec says anything else is True + return True + + + +def fixed(code): + return (code >> 6) != 2 + +def width(code): + # decimal + if code >= 192: + decsel = (code >> 4) & 3 + if decsel == 0: + return 5 + elif decsel == 1: + return 9 + elif decsel == 3: + return 0 + else: + raise ValueError(code) + # variable width + elif code < 192 and code >= 128: + lenlen = (code >> 4) & 3 + if lenlen == 3: raise ValueError(code) + return 2 ** lenlen + # fixed width + else: + return (code >> 4) & 7 |