summaryrefslogtreecommitdiff
path: root/Final/python/qpid/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'Final/python/qpid/connection.py')
-rw-r--r--Final/python/qpid/connection.py270
1 files changed, 270 insertions, 0 deletions
diff --git a/Final/python/qpid/connection.py b/Final/python/qpid/connection.py
new file mode 100644
index 0000000000..0b788e091b
--- /dev/null
+++ b/Final/python/qpid/connection.py
@@ -0,0 +1,270 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A Connection class containing socket code that uses the spec metadata
+to read and write Frame objects. This could be used by a client,
+server, or even a proxy implementation.
+"""
+
+import socket, codec,logging
+from cStringIO import StringIO
+from spec import load, pythonize
+from codec import EOF
+
+class SockIO:
+
+ def __init__(self, sock):
+ self.sock = sock
+
+ def write(self, buf):
+# print "OUT: %r" % buf
+ self.sock.sendall(buf)
+
+ def read(self, n):
+ data = ""
+ while len(data) < n:
+ try:
+ s = self.sock.recv(n - len(data))
+ except socket.error:
+ break
+ if len(s) == 0:
+ break
+# print "IN: %r" % s
+ data += s
+ return data
+
+ def flush(self):
+ pass
+
+class Connection:
+
+ def __init__(self, host, port, spec):
+ self.host = host
+ self.port = port
+ self.spec = spec
+ self.FRAME_END = self.spec.constants.bypyname["frame_end"].id
+
+ def connect(self):
+ sock = socket.socket()
+ sock.connect((self.host, self.port))
+ sock.setblocking(1)
+ self.codec = codec.Codec(SockIO(sock))
+
+ def flush(self):
+ self.codec.flush()
+
+ INIT="!4s4B"
+
+ def init(self):
+ self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
+ self.spec.minor)
+
+ def write(self, frame):
+ c = self.codec
+ c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id)
+ c.encode_short(frame.channel)
+ frame.payload.encode(c)
+ c.encode_octet(self.FRAME_END)
+
+ def read(self):
+ c = self.codec
+ type = pythonize(self.spec.constants.byid[c.decode_octet()].name)
+ channel = c.decode_short()
+ payload = Frame.DECODERS[type].decode(self.spec, c)
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, end)
+ frame = Frame(channel, payload)
+ return frame
+
+class Frame:
+
+ METHOD = "frame_method"
+ HEADER = "frame_header"
+ BODY = "frame_body"
+ OOB_METHOD = "frame_oob_method"
+ OOB_HEADER = "frame_oob_header"
+ OOB_BODY = "frame_oob_body"
+ TRACE = "frame_trace"
+ HEARTBEAT = "frame_heartbeat"
+
+ DECODERS = {}
+
+ def __init__(self, channel, payload):
+ self.channel = channel
+ self.payload = payload
+
+ def __str__(self):
+ return "[%d] %s" % (self.channel, self.payload)
+
+class Payload:
+
+ class __metaclass__(type):
+
+ def __new__(cls, name, bases, dict):
+ for req in ("encode", "decode", "type"):
+ if not dict.has_key(req):
+ raise TypeError("%s must define %s" % (name, req))
+ dict["decode"] = staticmethod(dict["decode"])
+ t = type.__new__(cls, name, bases, dict)
+ if t.type != None:
+ Frame.DECODERS[t.type] = t
+ return t
+
+ type = None
+
+ def encode(self, enc): abstract
+
+ def decode(spec, dec): abstract
+
+class Method(Payload):
+
+ type = Frame.METHOD
+
+ def __init__(self, method, *args):
+ if len(args) != len(method.fields):
+ argspec = ["%s: %s" % (pythonize(f.name), f.type)
+ for f in method.fields]
+ raise TypeError("%s.%s expecting (%s), got %s" %
+ (pythonize(method.klass.name),
+ pythonize(method.name), ", ".join(argspec), args))
+ self.method = method
+ self.args = args
+
+ def encode(self, enc):
+ buf = StringIO()
+ c = codec.Codec(buf)
+ c.encode_short(self.method.klass.id)
+ c.encode_short(self.method.id)
+ for field, arg in zip(self.method.fields, self.args):
+ c.encode(field.type, arg)
+ c.flush()
+ enc.encode_longstr(buf.getvalue())
+
+ def decode(spec, dec):
+ enc = dec.decode_longstr()
+ c = codec.Codec(StringIO(enc))
+ klass = spec.classes.byid[c.decode_short()]
+ meth = klass.methods.byid[c.decode_short()]
+ args = tuple([c.decode(f.type) for f in meth.fields])
+ return Method(meth, *args)
+
+ def __str__(self):
+ return "%s %s" % (self.method, ", ".join([str(a) for a in self.args]))
+
+class Header(Payload):
+
+ type = Frame.HEADER
+
+ def __init__(self, klass, weight, size, **properties):
+ self.klass = klass
+ self.weight = weight
+ self.size = size
+ self.properties = properties
+
+ def __getitem__(self, name):
+ return self.properties[name]
+
+ def __setitem__(self, name, value):
+ self.properties[name] = value
+
+ def __delitem__(self, name):
+ del self.properties[name]
+
+ def encode(self, enc):
+ buf = StringIO()
+ c = codec.Codec(buf)
+ c.encode_short(self.klass.id)
+ c.encode_short(self.weight)
+ c.encode_longlong(self.size)
+
+ # property flags
+ nprops = len(self.klass.fields)
+ flags = 0
+ for i in range(nprops):
+ f = self.klass.fields.items[i]
+ flags <<= 1
+ if self.properties.get(f.name) != None:
+ flags |= 1
+ # the last bit indicates more flags
+ if i > 0 and (i % 15) == 0:
+ flags <<= 1
+ if nprops > (i + 1):
+ flags |= 1
+ c.encode_short(flags)
+ flags = 0
+ flags <<= ((16 - (nprops % 15)) % 16)
+ c.encode_short(flags)
+
+ # properties
+ for f in self.klass.fields:
+ v = self.properties.get(f.name)
+ if v != None:
+ c.encode(f.type, v)
+ c.flush()
+ enc.encode_longstr(buf.getvalue())
+
+ def decode(spec, dec):
+ c = codec.Codec(StringIO(dec.decode_longstr()))
+ klass = spec.classes.byid[c.decode_short()]
+ weight = c.decode_short()
+ size = c.decode_longlong()
+
+ # property flags
+ bits = []
+ while True:
+ flags = c.decode_short()
+ for i in range(15, 0, -1):
+ if flags >> i & 0x1 != 0:
+ bits.append(True)
+ else:
+ bits.append(False)
+ if flags & 0x1 == 0:
+ break
+
+ # properties
+ properties = {}
+ for b, f in zip(bits, klass.fields):
+ if b:
+ # Note: decode returns a unicode u'' string but only
+ # plain '' strings can be used as keywords so we need to
+ # stringify the names.
+ properties[str(f.name)] = c.decode(f.type)
+ return Header(klass, weight, size, **properties)
+
+ def __str__(self):
+ return "%s %s %s %s" % (self.klass, self.weight, self.size,
+ self.properties)
+
+class Body(Payload):
+
+ type = Frame.BODY
+
+ def __init__(self, content):
+ self.content = content
+
+ def encode(self, enc):
+ enc.encode_longstr(self.content)
+
+ def decode(spec, dec):
+ return Body(dec.decode_longstr())
+
+ def __str__(self):
+ return "Body(%r)" % self.content