diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /python/qpid/connection.py | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r-- | python/qpid/connection.py | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py new file mode 100644 index 0000000000..f4d0817e60 --- /dev/null +++ b/python/qpid/connection.py @@ -0,0 +1,265 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A Connection class containing socket code that uses the spec metadata +to read and write Frame objects. This could be used by a client, +server, or even a proxy implementation. +""" + +import socket, codec +from cStringIO import StringIO +from spec import load, pythonize +from codec import EOF + +class SockIO: + + def __init__(self, sock): + self.sock = sock + + def write(self, buf): +# print "OUT: %r" % buf + self.sock.sendall(buf) + + def read(self, n): + data = "" + while len(data) < n: + try: + s = self.sock.recv(n - len(data)) + except socket.error: + break + if len(s) == 0: + break +# print "IN: %r" % s + data += s + return data + + def flush(self): + pass + +class Connection: + + def __init__(self, host, port, spec): + self.host = host + self.port = port + self.spec = spec + self.FRAME_END = self.spec.constants.byname["frame end"].id + + def connect(self): + sock = socket.socket() + sock.connect((self.host, self.port)) + sock.setblocking(1) + self.codec = codec.Codec(SockIO(sock)) + + def flush(self): + self.codec.flush() + + INIT="!4s4B" + + def init(self): + self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, + self.spec.minor) + + def write(self, frame): + c = self.codec + c.encode_octet(self.spec.constants.byname[frame.payload.type].id) + c.encode_short(frame.channel) + frame.payload.encode(c) + c.encode_octet(self.FRAME_END) + + def read(self): + c = self.codec + type = self.spec.constants.byid[c.decode_octet()].name + channel = c.decode_short() + payload = Frame.DECODERS[type].decode(self.spec, c) + end = c.decode_octet() + if end != self.FRAME_END: + raise "frame error: expected %r, got %r" % (self.FRAME_END, end) + frame = Frame(channel, payload) + return frame + +class Frame: + + METHOD = "frame method" + HEADER = "frame header" + BODY = "frame body" + OOB_METHOD = "frame oob method" + OOB_HEADER = "frame oob header" + OOB_BODY = "frame oob body" + TRACE = "frame trace" + HEARTBEAT = "frame heartbeat" + + DECODERS = {} + + def __init__(self, channel, payload): + self.channel = channel + self.payload = payload + + def __str__(self): + return "[%d] %s" % (self.channel, self.payload) + +class Payload: + + class __metaclass__(type): + + def __new__(cls, name, bases, dict): + for req in ("encode", "decode", "type"): + if not dict.has_key(req): + raise TypeError("%s must define %s" % (name, req)) + dict["decode"] = staticmethod(dict["decode"]) + t = type.__new__(cls, name, bases, dict) + if t.type != None: + Frame.DECODERS[t.type] = t + return t + + type = None + + def encode(self, enc): abstract + + def decode(spec, dec): abstract + +class Method(Payload): + + type = Frame.METHOD + + def __init__(self, method, *args): + if len(args) != len(method.fields): + argspec = ["%s: %s" % (pythonize(f.name), f.type) + for f in method.fields] + raise TypeError("%s.%s expecting (%s), got %s" % + (pythonize(method.klass.name), + pythonize(method.name), ", ".join(argspec), args)) + self.method = method + self.args = args + + def encode(self, enc): + buf = StringIO() + c = codec.Codec(buf) + c.encode_short(self.method.klass.id) + c.encode_short(self.method.id) + for field, arg in zip(self.method.fields, self.args): + c.encode(field.type, arg) + c.flush() + enc.encode_longstr(buf.getvalue()) + + def decode(spec, dec): + enc = dec.decode_longstr() + c = codec.Codec(StringIO(enc)) + klass = spec.classes.byid[c.decode_short()] + meth = klass.methods.byid[c.decode_short()] + args = tuple([c.decode(f.type) for f in meth.fields]) + return Method(meth, *args) + + def __str__(self): + return "%s %s" % (self.method, ", ".join([str(a) for a in self.args])) + +class Header(Payload): + + type = Frame.HEADER + + def __init__(self, klass, weight, size, **properties): + self.klass = klass + self.weight = weight + self.size = size + self.properties = properties + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] + + def encode(self, enc): + buf = StringIO() + c = codec.Codec(buf) + c.encode_short(self.klass.id) + c.encode_short(self.weight) + c.encode_longlong(self.size) + + # property flags + nprops = len(self.klass.fields) + flags = 0 + for i in range(nprops): + f = self.klass.fields.items[i] + flags <<= 1 + if self.properties.get(f.name) != None: + flags |= 1 + # the last bit indicates more flags + if i > 0 and (i % 15) == 0: + flags <<= 1 + if nprops > (i + 1): + flags |= 1 + c.encode_short(flags) + flags = 0 + flags <<= ((16 - (nprops % 15)) % 16) + c.encode_short(flags) + + # properties + for f in self.klass.fields: + v = self.properties.get(f.name) + if v != None: + c.encode(f.type, v) + c.flush() + enc.encode_longstr(buf.getvalue()) + + def decode(spec, dec): + c = codec.Codec(StringIO(dec.decode_longstr())) + klass = spec.classes.byid[c.decode_short()] + weight = c.decode_short() + size = c.decode_longlong() + + # property flags + bits = [] + while True: + flags = c.decode_short() + for i in range(15, 0, -1): + if flags >> i & 0x1 != 0: + bits.append(True) + else: + bits.append(False) + if flags & 0x1 == 0: + break + + # properties + properties = {} + for b, f in zip(bits, klass.fields): + if b: + properties[f.name] = c.decode(f.type) + + return Header(klass, weight, size, **properties) + + def __str__(self): + return "%s %s %s %s" % (self.klass, self.weight, self.size, + self.properties) + +class Body(Payload): + + type = Frame.BODY + + def __init__(self, content): + self.content = content + + def encode(self, enc): + enc.encode_longstr(self.content) + + def decode(spec, dec): + return Body(dec.decode_longstr()) + + def __str__(self): + return "Body(%r)" % self.content |