summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/codec.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/codec.py')
-rw-r--r--qpid/python/qpid/codec.py701
1 files changed, 701 insertions, 0 deletions
diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py
new file mode 100644
index 0000000000..a4c542415c
--- /dev/null
+++ b/qpid/python/qpid/codec.py
@@ -0,0 +1,701 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Utility code to translate between python objects and AMQP encoded data
+fields.
+
+The unit test for this module is located in tests/codec.py
+"""
+
+import re, qpid, spec08, os
+from cStringIO import StringIO
+from struct import *
+from reference import ReferenceId
+from logging import getLogger
+
+log = getLogger("qpid.codec")
+
+class EOF(Exception):
+ pass
+
+# This code appears to be dead
+TYPE_ALIASES = {
+ "long_string": "longstr",
+ "unsigned_int": "long"
+ }
+
+class Codec:
+
+ """
+ class that handles encoding/decoding of AMQP primitives
+ """
+
+ def __init__(self, stream, spec):
+ """
+ initializing the stream/fields used
+ """
+ self.stream = stream
+ self.spec = spec
+ self.nwrote = 0
+ self.nread = 0
+ self.incoming_bits = []
+ self.outgoing_bits = []
+
+ # Before 0-91, the AMQP's set of types did not include the boolean type. However,
+ # the 0-8 and 0-9 Java client uses this type so we encode/decode it too. However, this
+ # can be turned off by setting the followng environment value.
+ if "QPID_CODEC_DISABLE_0_91_BOOLEAN" in os.environ:
+ self.understand_boolean = False
+ else:
+ self.understand_boolean = True
+
+ log.debug("AMQP 0-91 boolean supported : %r", self.understand_boolean)
+
+ self.types = {}
+ self.codes = {}
+ self.integertypes = [int, long]
+ self.encodings = {
+ float: "double", # python uses 64bit floats, send them as doubles
+ basestring: "longstr",
+ None.__class__:"void",
+ list: "sequence",
+ tuple: "sequence",
+ dict: "table"
+ }
+
+ if self.understand_boolean:
+ self.encodings[bool] = "boolean"
+
+ for constant in self.spec.constants:
+ # This code appears to be dead
+ if constant.klass == "field-table-type":
+ type = constant.name.replace("field_table_", "")
+ self.typecode(constant.id, TYPE_ALIASES.get(type, type))
+
+ if not self.types:
+ # long-string 'S'
+ self.typecode(ord('S'), "longstr")
+ # void 'V'
+ self.typecode(ord('V'), "void")
+ # long-int 'I' (32bit signed)
+ self.typecode(ord('I'), "signed_int")
+ # long-long-int 'l' (64bit signed)
+ # This is a long standing pre-0-91-spec type used by the Java
+ # client, 0-9-1 says it should be unsigned or use 'L')
+ self.typecode(ord('l'), "signed_long")
+ # double 'd'
+ self.typecode(ord('d'), "double")
+ # float 'f'
+ self.typecode(ord('f'), "float")
+
+ if self.understand_boolean:
+ self.typecode(ord('t'), "boolean")
+
+ ## The following are supported for decoding only ##
+
+ # short-short-uint 'b' (8bit signed)
+ self.types[ord('b')] = "signed_octet"
+ # short-int 's' (16bit signed)
+ # This is a long standing pre-0-91-spec type code used by the Java
+ # client to send shorts, it should really be a short-string, or for 0-9-1 use 'U'
+ self.types[ord('s')] = "signed_short"
+
+ def typecode(self, code, type):
+ self.types[code] = type
+ self.codes[type] = code
+
+ def resolve(self, klass, value):
+ if(klass in self.integertypes):
+ if (value >= -2147483648 and value <= 2147483647):
+ return "signed_int"
+ elif (value >= -9223372036854775808 and value <= 9223372036854775807):
+ return "signed_long"
+ else:
+ raise ValueError('Integer value is outwith the supported 64bit signed range')
+ if self.encodings.has_key(klass):
+ return self.encodings[klass]
+ for base in klass.__bases__:
+ result = self.resolve(base, value)
+ if result != None:
+ return result
+
+ def read(self, n):
+ """
+ reads in 'n' bytes from the stream. Can raise EOF exception
+ """
+ self.clearbits()
+ data = self.stream.read(n)
+ if n > 0 and len(data) == 0:
+ raise EOF()
+ self.nread += len(data)
+ return data
+
+ def write(self, s):
+ """
+ writes data 's' to the stream
+ """
+ self.flushbits()
+ self.stream.write(s)
+ self.nwrote += len(s)
+
+ def flush(self):
+ """
+ flushes the bits and data present in the stream
+ """
+ self.flushbits()
+ self.stream.flush()
+
+ def flushbits(self):
+ """
+ flushes the bits(compressed into octets) onto the stream
+ """
+ if len(self.outgoing_bits) > 0:
+ bytes = []
+ index = 0
+ for b in self.outgoing_bits:
+ if index == 0: bytes.append(0)
+ if b: bytes[-1] |= 1 << index
+ index = (index + 1) % 8
+ del self.outgoing_bits[:]
+ for byte in bytes:
+ self.encode_octet(byte)
+
+ def clearbits(self):
+ if self.incoming_bits:
+ self.incoming_bits = []
+
+ def pack(self, fmt, *args):
+ """
+ packs the data 'args' as per the format 'fmt' and writes it to the stream
+ """
+ self.write(pack(fmt, *args))
+
+ def unpack(self, fmt):
+ """
+ reads data from the stream and unpacks it as per the format 'fmt'
+ """
+ size = calcsize(fmt)
+ data = self.read(size)
+ values = unpack(fmt, data)
+ if len(values) == 1:
+ return values[0]
+ else:
+ return values
+
+ def encode(self, type, value):
+ """
+ calls the appropriate encode function e.g. encode_octet, encode_short etc.
+ """
+ if isinstance(type, spec08.Struct):
+ self.encode_struct(type, value)
+ else:
+ getattr(self, "encode_" + type)(value)
+
+ def decode(self, type):
+ """
+ calls the appropriate decode function e.g. decode_octet, decode_short etc.
+ """
+ if isinstance(type, spec08.Struct):
+ return self.decode_struct(type)
+ else:
+ log.debug("Decoding using method: decode_" + type)
+ return getattr(self, "decode_" + type)()
+
+ def encode_bit(self, o):
+ """
+ encodes a bit
+ """
+ if o:
+ self.outgoing_bits.append(True)
+ else:
+ self.outgoing_bits.append(False)
+
+ def decode_bit(self):
+ """
+ decodes a bit
+ """
+ if len(self.incoming_bits) == 0:
+ bits = self.decode_octet()
+ for i in range(8):
+ self.incoming_bits.append(bits >> i & 1 != 0)
+ return self.incoming_bits.pop(0)
+
+ def encode_octet(self, o):
+ """
+ encodes an UNSIGNED octet (8 bits) data 'o' in network byte order
+ """
+
+ # octet's valid range is [0,255]
+ if (o < 0 or o > 255):
+ raise ValueError('Valid range of octet is [0,255]')
+
+ self.pack("!B", int(o))
+
+ def decode_octet(self):
+ """
+ decodes an UNSIGNED octet (8 bits) encoded in network byte order
+ """
+ return self.unpack("!B")
+
+ def decode_signed_octet(self):
+ """
+ decodes a signed octet (8 bits) encoded in network byte order
+ """
+ return self.unpack("!b")
+
+ def encode_short(self, o):
+ """
+ encodes an UNSIGNED short (16 bits) data 'o' in network byte order
+ AMQP 0-9-1 type: short-uint
+ """
+
+ # short int's valid range is [0,65535]
+ if (o < 0 or o > 65535):
+ raise ValueError('Valid range of short int is [0,65535]: %s' % o)
+
+ self.pack("!H", int(o))
+
+ def decode_short(self):
+ """
+ decodes an UNSIGNED short (16 bits) in network byte order
+ AMQP 0-9-1 type: short-uint
+ """
+ return self.unpack("!H")
+
+ def decode_signed_short(self):
+ """
+ decodes a signed short (16 bits) in network byte order
+ AMQP 0-9-1 type: short-int
+ """
+ return self.unpack("!h")
+
+ def encode_long(self, o):
+ """
+ encodes an UNSIGNED long (32 bits) data 'o' in network byte order
+ AMQP 0-9-1 type: long-uint
+ """
+
+ # we need to check both bounds because on 64 bit platforms
+ # struct.pack won't raise an error if o is too large
+ if (o < 0 or o > 4294967295):
+ raise ValueError('Valid range of long int is [0,4294967295]')
+
+ self.pack("!L", int(o))
+
+ def decode_long(self):
+ """
+ decodes an UNSIGNED long (32 bits) in network byte order
+ AMQP 0-9-1 type: long-uint
+ """
+ return self.unpack("!L")
+
+ def encode_signed_long(self, o):
+ """
+ encodes a signed long (64 bits) in network byte order
+ AMQP 0-9-1 type: long-long-int
+ """
+ self.pack("!q", o)
+
+ def decode_signed_long(self):
+ """
+ decodes a signed long (64 bits) in network byte order
+ AMQP 0-9-1 type: long-long-int
+ """
+ return self.unpack("!q")
+
+ def encode_signed_int(self, o):
+ """
+ encodes a signed int (32 bits) in network byte order
+ AMQP 0-9-1 type: long-int
+ """
+ self.pack("!l", o)
+
+ def decode_signed_int(self):
+ """
+ decodes a signed int (32 bits) in network byte order
+ AMQP 0-9-1 type: long-int
+ """
+ return self.unpack("!l")
+
+ def encode_longlong(self, o):
+ """
+ encodes an UNSIGNED long long (64 bits) data 'o' in network byte order
+ AMQP 0-9-1 type: long-long-uint
+ """
+ self.pack("!Q", o)
+
+ def decode_longlong(self):
+ """
+ decodes an UNSIGNED long long (64 bits) in network byte order
+ AMQP 0-9-1 type: long-long-uint
+ """
+ return self.unpack("!Q")
+
+ def encode_float(self, o):
+ self.pack("!f", o)
+
+ def decode_float(self):
+ return self.unpack("!f")
+
+ def encode_double(self, o):
+ self.pack("!d", o)
+
+ def decode_double(self):
+ return self.unpack("!d")
+
+ def encode_bin128(self, b):
+ for idx in range (0,16):
+ self.pack("!B", ord (b[idx]))
+
+ def decode_bin128(self):
+ result = ""
+ for idx in range (0,16):
+ result = result + chr (self.unpack("!B"))
+ return result
+
+ def encode_raw(self, len, b):
+ for idx in range (0,len):
+ self.pack("!B", b[idx])
+
+ def decode_raw(self, len):
+ result = ""
+ for idx in range (0,len):
+ result = result + chr (self.unpack("!B"))
+ return result
+
+ def enc_str(self, fmt, s):
+ """
+ encodes a string 's' in network byte order as per format 'fmt'
+ """
+ size = len(s)
+ self.pack(fmt, size)
+ self.write(s)
+
+ def dec_str(self, fmt):
+ """
+ decodes a string in network byte order as per format 'fmt'
+ """
+ size = self.unpack(fmt)
+ return self.read(size)
+
+ def encode_shortstr(self, s):
+ """
+ encodes a short string 's' in network byte order
+ """
+
+ # short strings are limited to 255 octets
+ if len(s) > 255:
+ raise ValueError('Short strings are limited to 255 octets')
+
+ self.enc_str("!B", s)
+
+ def decode_shortstr(self):
+ """
+ decodes a short string in network byte order
+ """
+ return self.dec_str("!B")
+
+ def encode_longstr(self, s):
+ """
+ encodes a long string 's' in network byte order
+ """
+ if isinstance(s, dict):
+ self.encode_table(s)
+ else:
+ self.enc_str("!L", s)
+
+ def decode_longstr(self):
+ """
+ decodes a long string 's' in network byte order
+ """
+ return self.dec_str("!L")
+
+ def encode_table(self, tbl):
+ """
+ encodes a table data structure in network byte order
+ """
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ if tbl:
+ for key, value in tbl.items():
+ if self.spec.major == 8 and self.spec.minor == 0 and len(key) > 128:
+ raise ValueError("field table key too long: '%s'" % key)
+ type = self.resolve(value.__class__, value)
+ if type == None:
+ raise ValueError("no encoding for: " + str(value.__class__))
+ codec.encode_shortstr(key)
+ codec.encode_octet(self.codes[type])
+ codec.encode(type, value)
+ s = enc.getvalue()
+ self.encode_long(len(s))
+ self.write(s)
+
+ def decode_table(self):
+ """
+ decodes a table data structure in network byte order
+ """
+ size = self.decode_long()
+ start = self.nread
+ result = {}
+ while self.nread - start < size:
+ key = self.decode_shortstr()
+ log.debug("Field table entry key: %r", key)
+ code = self.decode_octet()
+ log.debug("Field table entry type code: %r", code)
+ if self.types.has_key(code):
+ value = self.decode(self.types[code])
+ else:
+ w = width(code)
+ if fixed(code):
+ value = self.read(w)
+ else:
+ value = self.read(self.dec_num(w))
+ result[key] = value
+ log.debug("Field table entry value: %r", value)
+ return result
+
+ def encode_timestamp(self, t):
+ """
+ encodes a timestamp data structure in network byte order
+ """
+ self.encode_longlong(t)
+
+ def decode_timestamp(self):
+ """
+ decodes a timestamp data structure in network byte order
+ """
+ return self.decode_longlong()
+
+ def encode_content(self, s):
+ """
+ encodes a content data structure in network byte order
+
+ content can be passed as a string in which case it is assumed to
+ be inline data, or as an instance of ReferenceId indicating it is
+ a reference id
+ """
+ if isinstance(s, ReferenceId):
+ self.encode_octet(1)
+ self.encode_longstr(s.id)
+ else:
+ self.encode_octet(0)
+ self.encode_longstr(s)
+
+ def decode_content(self):
+ """
+ decodes a content data structure in network byte order
+
+ return a string for inline data and a ReferenceId instance for
+ references
+ """
+ type = self.decode_octet()
+ if type == 0:
+ return self.decode_longstr()
+ else:
+ return ReferenceId(self.decode_longstr())
+
+ # new domains for 0-10:
+
+ def encode_rfc1982_long(self, s):
+ self.encode_long(s)
+
+ def decode_rfc1982_long(self):
+ return self.decode_long()
+
+ def encode_rfc1982_long_set(self, s):
+ self.encode_short(len(s) * 4)
+ for i in s:
+ self.encode_long(i)
+
+ def decode_rfc1982_long_set(self):
+ count = self.decode_short() / 4
+ set = []
+ for i in range(0, count):
+ set.append(self.decode_long())
+ return set;
+
+ def encode_uuid(self, s):
+ self.pack("16s", s)
+
+ def decode_uuid(self):
+ return self.unpack("16s")
+
+ def encode_void(self,o):
+ #NO-OP, value is implicit in the type.
+ return
+
+ def decode_void(self):
+ return None
+
+ def enc_num(self, width, n):
+ if width == 1:
+ self.encode_octet(n)
+ elif width == 2:
+ self.encode_short(n)
+ elif width == 3:
+ self.encode_long(n)
+ else:
+ raise ValueError("invalid width: %s" % width)
+
+ def dec_num(self, width):
+ if width == 1:
+ return self.decode_octet()
+ elif width == 2:
+ return self.decode_short()
+ elif width == 4:
+ return self.decode_long()
+ else:
+ raise ValueError("invalid width: %s" % width)
+
+ def encode_struct(self, type, s):
+ if type.size:
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ codec.encode_struct_body(type, s)
+ codec.flush()
+ body = enc.getvalue()
+ self.enc_num(type.size, len(body))
+ self.write(body)
+ else:
+ self.encode_struct_body(type, s)
+
+ def decode_struct(self, type):
+ if type.size:
+ size = self.dec_num(type.size)
+ if size == 0:
+ return None
+ return self.decode_struct_body(type)
+
+ def encode_struct_body(self, type, s):
+ reserved = 8*type.pack - len(type.fields)
+ assert reserved >= 0
+
+ for f in type.fields:
+ if s == None:
+ self.encode_bit(False)
+ elif f.type == "bit":
+ self.encode_bit(s.get(f.name))
+ else:
+ self.encode_bit(s.has(f.name))
+
+ for i in range(reserved):
+ self.encode_bit(False)
+
+ for f in type.fields:
+ if f.type != "bit" and s != None and s.has(f.name):
+ self.encode(f.type, s.get(f.name))
+
+ self.flush()
+
+ def decode_struct_body(self, type):
+ reserved = 8*type.pack - len(type.fields)
+ assert reserved >= 0
+
+ s = qpid.Struct(type)
+
+ for f in type.fields:
+ if f.type == "bit":
+ s.set(f.name, self.decode_bit())
+ elif self.decode_bit():
+ s.set(f.name, None)
+
+ for i in range(reserved):
+ if self.decode_bit():
+ raise ValueError("expecting reserved flag")
+
+ for f in type.fields:
+ if f.type != "bit" and s.has(f.name):
+ s.set(f.name, self.decode(f.type))
+
+ self.clearbits()
+
+ return s
+
+ def encode_long_struct(self, s):
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ type = s.type
+ codec.encode_short(type.type)
+ codec.encode_struct_body(type, s)
+ self.encode_longstr(enc.getvalue())
+
+ def decode_long_struct(self):
+ codec = Codec(StringIO(self.decode_longstr()), self.spec)
+ type = self.spec.structs[codec.decode_short()]
+ return codec.decode_struct_body(type)
+
+ def decode_array(self):
+ size = self.decode_long()
+ code = self.decode_octet()
+ count = self.decode_long()
+ result = []
+ for i in range(0, count):
+ if self.types.has_key(code):
+ value = self.decode(self.types[code])
+ else:
+ w = width(code)
+ if fixed(code):
+ value = self.read(w)
+ else:
+ value = self.read(self.dec_num(w))
+ result.append(value)
+ return result
+
+ def encode_boolean(self, s):
+ if (s):
+ self.pack("!c", "\x01")
+ else:
+ self.pack("!c", "\x00")
+
+ def decode_boolean(self):
+ b = self.unpack("!c")
+ if b == "\x00":
+ return False
+ else:
+ # AMQP spec says anything else is True
+ return True
+
+
+
+def fixed(code):
+ return (code >> 6) != 2
+
+def width(code):
+ # decimal
+ if code >= 192:
+ decsel = (code >> 4) & 3
+ if decsel == 0:
+ return 5
+ elif decsel == 1:
+ return 9
+ elif decsel == 3:
+ return 0
+ else:
+ raise ValueError(code)
+ # variable width
+ elif code < 192 and code >= 128:
+ lenlen = (code >> 4) & 3
+ if lenlen == 3: raise ValueError(code)
+ return 2 ** lenlen
+ # fixed width
+ else:
+ return (code >> 4) & 7