summaryrefslogtreecommitdiff
path: root/python/qpid/connection.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /python/qpid/connection.py
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r--python/qpid/connection.py265
1 files changed, 265 insertions, 0 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
new file mode 100644
index 0000000000..f4d0817e60
--- /dev/null
+++ b/python/qpid/connection.py
@@ -0,0 +1,265 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A Connection class containing socket code that uses the spec metadata
+to read and write Frame objects. This could be used by a client,
+server, or even a proxy implementation.
+"""
+
+import socket, codec
+from cStringIO import StringIO
+from spec import load, pythonize
+from codec import EOF
+
+class SockIO:
+
+ def __init__(self, sock):
+ self.sock = sock
+
+ def write(self, buf):
+# print "OUT: %r" % buf
+ self.sock.sendall(buf)
+
+ def read(self, n):
+ data = ""
+ while len(data) < n:
+ try:
+ s = self.sock.recv(n - len(data))
+ except socket.error:
+ break
+ if len(s) == 0:
+ break
+# print "IN: %r" % s
+ data += s
+ return data
+
+ def flush(self):
+ pass
+
+class Connection:
+
+ def __init__(self, host, port, spec):
+ self.host = host
+ self.port = port
+ self.spec = spec
+ self.FRAME_END = self.spec.constants.byname["frame end"].id
+
+ def connect(self):
+ sock = socket.socket()
+ sock.connect((self.host, self.port))
+ sock.setblocking(1)
+ self.codec = codec.Codec(SockIO(sock))
+
+ def flush(self):
+ self.codec.flush()
+
+ INIT="!4s4B"
+
+ def init(self):
+ self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
+ self.spec.minor)
+
+ def write(self, frame):
+ c = self.codec
+ c.encode_octet(self.spec.constants.byname[frame.payload.type].id)
+ c.encode_short(frame.channel)
+ frame.payload.encode(c)
+ c.encode_octet(self.FRAME_END)
+
+ def read(self):
+ c = self.codec
+ type = self.spec.constants.byid[c.decode_octet()].name
+ channel = c.decode_short()
+ payload = Frame.DECODERS[type].decode(self.spec, c)
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, end)
+ frame = Frame(channel, payload)
+ return frame
+
+class Frame:
+
+ METHOD = "frame method"
+ HEADER = "frame header"
+ BODY = "frame body"
+ OOB_METHOD = "frame oob method"
+ OOB_HEADER = "frame oob header"
+ OOB_BODY = "frame oob body"
+ TRACE = "frame trace"
+ HEARTBEAT = "frame heartbeat"
+
+ DECODERS = {}
+
+ def __init__(self, channel, payload):
+ self.channel = channel
+ self.payload = payload
+
+ def __str__(self):
+ return "[%d] %s" % (self.channel, self.payload)
+
+class Payload:
+
+ class __metaclass__(type):
+
+ def __new__(cls, name, bases, dict):
+ for req in ("encode", "decode", "type"):
+ if not dict.has_key(req):
+ raise TypeError("%s must define %s" % (name, req))
+ dict["decode"] = staticmethod(dict["decode"])
+ t = type.__new__(cls, name, bases, dict)
+ if t.type != None:
+ Frame.DECODERS[t.type] = t
+ return t
+
+ type = None
+
+ def encode(self, enc): abstract
+
+ def decode(spec, dec): abstract
+
+class Method(Payload):
+
+ type = Frame.METHOD
+
+ def __init__(self, method, *args):
+ if len(args) != len(method.fields):
+ argspec = ["%s: %s" % (pythonize(f.name), f.type)
+ for f in method.fields]
+ raise TypeError("%s.%s expecting (%s), got %s" %
+ (pythonize(method.klass.name),
+ pythonize(method.name), ", ".join(argspec), args))
+ self.method = method
+ self.args = args
+
+ def encode(self, enc):
+ buf = StringIO()
+ c = codec.Codec(buf)
+ c.encode_short(self.method.klass.id)
+ c.encode_short(self.method.id)
+ for field, arg in zip(self.method.fields, self.args):
+ c.encode(field.type, arg)
+ c.flush()
+ enc.encode_longstr(buf.getvalue())
+
+ def decode(spec, dec):
+ enc = dec.decode_longstr()
+ c = codec.Codec(StringIO(enc))
+ klass = spec.classes.byid[c.decode_short()]
+ meth = klass.methods.byid[c.decode_short()]
+ args = tuple([c.decode(f.type) for f in meth.fields])
+ return Method(meth, *args)
+
+ def __str__(self):
+ return "%s %s" % (self.method, ", ".join([str(a) for a in self.args]))
+
+class Header(Payload):
+
+ type = Frame.HEADER
+
+ def __init__(self, klass, weight, size, **properties):
+ self.klass = klass
+ self.weight = weight
+ self.size = size
+ self.properties = properties
+
+ def __getitem__(self, name):
+ return self.properties[name]
+
+ def __setitem__(self, name, value):
+ self.properties[name] = value
+
+ def __delitem__(self, name):
+ del self.properties[name]
+
+ def encode(self, enc):
+ buf = StringIO()
+ c = codec.Codec(buf)
+ c.encode_short(self.klass.id)
+ c.encode_short(self.weight)
+ c.encode_longlong(self.size)
+
+ # property flags
+ nprops = len(self.klass.fields)
+ flags = 0
+ for i in range(nprops):
+ f = self.klass.fields.items[i]
+ flags <<= 1
+ if self.properties.get(f.name) != None:
+ flags |= 1
+ # the last bit indicates more flags
+ if i > 0 and (i % 15) == 0:
+ flags <<= 1
+ if nprops > (i + 1):
+ flags |= 1
+ c.encode_short(flags)
+ flags = 0
+ flags <<= ((16 - (nprops % 15)) % 16)
+ c.encode_short(flags)
+
+ # properties
+ for f in self.klass.fields:
+ v = self.properties.get(f.name)
+ if v != None:
+ c.encode(f.type, v)
+ c.flush()
+ enc.encode_longstr(buf.getvalue())
+
+ def decode(spec, dec):
+ c = codec.Codec(StringIO(dec.decode_longstr()))
+ klass = spec.classes.byid[c.decode_short()]
+ weight = c.decode_short()
+ size = c.decode_longlong()
+
+ # property flags
+ bits = []
+ while True:
+ flags = c.decode_short()
+ for i in range(15, 0, -1):
+ if flags >> i & 0x1 != 0:
+ bits.append(True)
+ else:
+ bits.append(False)
+ if flags & 0x1 == 0:
+ break
+
+ # properties
+ properties = {}
+ for b, f in zip(bits, klass.fields):
+ if b:
+ properties[f.name] = c.decode(f.type)
+
+ return Header(klass, weight, size, **properties)
+
+ def __str__(self):
+ return "%s %s %s %s" % (self.klass, self.weight, self.size,
+ self.properties)
+
+class Body(Payload):
+
+ type = Frame.BODY
+
+ def __init__(self, content):
+ self.content = content
+
+ def encode(self, enc):
+ enc.encode_longstr(self.content)
+
+ def decode(spec, dec):
+ return Body(dec.decode_longstr())
+
+ def __str__(self):
+ return "Body(%r)" % self.content