summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/assembler.py118
-rw-r--r--python/qpid/client.py7
-rw-r--r--python/qpid/codec010.py227
-rw-r--r--python/qpid/connection.py65
-rw-r--r--python/qpid/connection08.py15
-rw-r--r--python/qpid/datatypes.py6
-rw-r--r--python/qpid/delegates.py33
-rw-r--r--python/qpid/exceptions.py1
-rw-r--r--python/qpid/framer.py63
-rw-r--r--python/qpid/framing.py172
-rw-r--r--python/qpid/generator.py48
-rw-r--r--python/qpid/harness.py20
-rw-r--r--python/qpid/messaging.py29
-rw-r--r--python/qpid/ops.py277
-rw-r--r--python/qpid/peer.py4
-rw-r--r--python/qpid/session.py199
-rw-r--r--python/qpid/spec.py4
-rw-r--r--python/qpid/spec010.py708
-rw-r--r--python/qpid/testlib.py326
-rw-r--r--python/qpid/tests/framing.py116
-rw-r--r--python/qpid/tests/messaging.py8
21 files changed, 997 insertions, 1449 deletions
diff --git a/python/qpid/assembler.py b/python/qpid/assembler.py
deleted file mode 100644
index 92bb0aa0f8..0000000000
--- a/python/qpid/assembler.py
+++ /dev/null
@@ -1,118 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from codec010 import StringCodec
-from framer import *
-from logging import getLogger
-
-log = getLogger("qpid.io.seg")
-
-class Segment:
-
- def __init__(self, first, last, type, track, channel, payload):
- self.id = None
- self.offset = None
- self.first = first
- self.last = last
- self.type = type
- self.track = track
- self.channel = channel
- self.payload = payload
-
- def decode(self, spec):
- segs = spec["segment_type"]
- choice = segs.choices[self.type]
- return getattr(self, "decode_%s" % choice.name)(spec)
-
- def decode_control(self, spec):
- sc = StringCodec(spec, self.payload)
- return sc.read_control()
-
- def decode_command(self, spec):
- sc = StringCodec(spec, self.payload)
- hdr, cmd = sc.read_command()
- cmd.id = self.id
- return hdr, cmd
-
- def decode_header(self, spec):
- sc = StringCodec(spec, self.payload)
- values = []
- while len(sc.encoded) > 0:
- values.append(sc.read_struct32())
- return values
-
- def decode_body(self, spec):
- return self.payload
-
- def __str__(self):
- return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
- self.track, self.channel, self.payload)
-
- def __repr__(self):
- return str(self)
-
-class Assembler(Framer):
-
- def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD):
- Framer.__init__(self, sock)
- self.max_payload = max_payload
- self.fragments = {}
-
- def read_segment(self):
- while True:
- frame = self.read_frame()
-
- key = (frame.channel, frame.track)
- seg = self.fragments.get(key)
- if seg == None:
- seg = Segment(frame.isFirstSegment(), frame.isLastSegment(),
- frame.type, frame.track, frame.channel, "")
- self.fragments[key] = seg
-
- seg.payload += frame.payload
-
- if frame.isLastFrame():
- self.fragments.pop(key)
- log.debug("RECV %s", seg)
- return seg
-
- def write_segment(self, segment):
- remaining = segment.payload
-
- first = True
- while first or remaining:
- payload = remaining[:self.max_payload]
- remaining = remaining[self.max_payload:]
-
- flags = 0
- if first:
- flags |= FIRST_FRM
- first = False
- if not remaining:
- flags |= LAST_FRM
- if segment.first:
- flags |= FIRST_SEG
- if segment.last:
- flags |= LAST_SEG
-
- frame = Frame(flags, segment.type, segment.track, segment.channel,
- payload)
- self.write_frame(frame)
-
- log.debug("SENT %s", segment)
diff --git a/python/qpid/client.py b/python/qpid/client.py
index 4605710de8..6107a4bc35 100644
--- a/python/qpid/client.py
+++ b/python/qpid/client.py
@@ -39,11 +39,8 @@ class Client:
if spec:
self.spec = spec
else:
- try:
- name = os.environ["AMQP_SPEC"]
- except KeyError:
- raise EnvironmentError("environment variable AMQP_SPEC must be set")
- self.spec = load(name)
+ from qpid_config import amqp_spec_0_9
+ self.spec = load(amqp_spec_0_9)
self.structs = StructFactory(self.spec)
self.sessions = {}
diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py
index f07362c38d..682743df19 100644
--- a/python/qpid/codec010.py
+++ b/python/qpid/codec010.py
@@ -20,23 +20,66 @@
import datetime
from packer import Packer
from datatypes import serial, timestamp, RangedSet, Struct, UUID
+from ops import Compound, PRIMITIVE, COMPOUND
class CodecException(Exception): pass
+def direct(t):
+ return lambda x: t
+
+def map_str(s):
+ for c in s:
+ if ord(c) >= 0x80:
+ return "vbin16"
+ return "str16"
+
class Codec(Packer):
- def __init__(self, spec):
- self.spec = spec
+ ENCODINGS = {
+ unicode: direct("str16"),
+ str: map_str,
+ buffer: direct("vbin32"),
+ int: direct("int64"),
+ long: direct("int64"),
+ float: direct("double"),
+ None.__class__: direct("void"),
+ list: direct("list"),
+ tuple: direct("list"),
+ dict: direct("map"),
+ timestamp: direct("datetime"),
+ datetime.datetime: direct("datetime"),
+ UUID: direct("uuid"),
+ Compound: direct("struct32")
+ }
+
+ def encoding(self, obj):
+ enc = self._encoding(obj.__class__, obj)
+ if enc is None:
+ raise CodecException("no encoding for %r" % obj)
+ return PRIMITIVE[enc]
+
+ def _encoding(self, klass, obj):
+ if self.ENCODINGS.has_key(klass):
+ return self.ENCODINGS[klass](obj)
+ for base in klass.__bases__:
+ result = self._encoding(base, obj)
+ if result != None:
+ return result
+
+ def read_primitive(self, type):
+ return getattr(self, "read_%s" % type.NAME)()
+ def write_primitive(self, type, v):
+ getattr(self, "write_%s" % type.NAME)(v)
- def write_void(self, v):
- assert v == None
def read_void(self):
return None
+ def write_void(self, v):
+ assert v == None
- def write_bit(self, b):
- if not b: raise ValueError(b)
def read_bit(self):
return True
+ def write_bit(self, b):
+ if not b: raise ValueError(b)
def read_uint8(self):
return self.unpack("!B")
@@ -172,20 +215,8 @@ class Codec(Packer):
self.write_uint32(len(b))
self.write(b)
- def write_map(self, m):
- sc = StringCodec(self.spec)
- if m is not None:
- sc.write_uint32(len(m))
- for k, v in m.items():
- type = self.spec.encoding(v)
- if type == None:
- raise CodecException("no encoding for %s" % v.__class__)
- sc.write_str8(k)
- sc.write_uint8(type.code)
- type.encode(sc, v)
- self.write_vbin32(sc.encoded)
def read_map(self):
- sc = StringCodec(self.spec, self.read_vbin32())
+ sc = StringCodec(self.read_vbin32())
if not sc.encoded:
return None
count = sc.read_uint32()
@@ -193,91 +224,132 @@ class Codec(Packer):
while sc.encoded:
k = sc.read_str8()
code = sc.read_uint8()
- type = self.spec.types[code]
- v = type.decode(sc)
+ type = PRIMITIVE[code]
+ v = sc.read_primitive(type)
result[k] = v
return result
+ def write_map(self, m):
+ sc = StringCodec()
+ if m is not None:
+ sc.write_uint32(len(m))
+ for k, v in m.items():
+ type = self.encoding(v)
+ sc.write_str8(k)
+ sc.write_uint8(type.CODE)
+ sc.write_primitive(type, v)
+ self.write_vbin32(sc.encoded)
+ def read_array(self):
+ sc = StringCodec(self.read_vbin32())
+ if not sc.encoded:
+ return None
+ type = PRIMITIVE[sc.read_uint8()]
+ count = sc.read_uint32()
+ result = []
+ while count > 0:
+ result.append(sc.read_primitive(type))
+ count -= 1
+ return result
def write_array(self, a):
- sc = StringCodec(self.spec)
+ sc = StringCodec()
if a is not None:
if len(a) > 0:
- type = self.spec.encoding(a[0])
+ type = self.encoding(a[0])
else:
- type = self.spec.encoding(None)
- sc.write_uint8(type.code)
+ type = self.encoding(None)
+ sc.write_uint8(type.CODE)
sc.write_uint32(len(a))
for o in a:
- type.encode(sc, o)
+ sc.write_primitive(type, o)
self.write_vbin32(sc.encoded)
- def read_array(self):
- sc = StringCodec(self.spec, self.read_vbin32())
+
+ def read_list(self):
+ sc = StringCodec(self.read_vbin32())
if not sc.encoded:
return None
- type = self.spec.types[sc.read_uint8()]
count = sc.read_uint32()
result = []
while count > 0:
- result.append(type.decode(sc))
+ type = PRIMITIVE[sc.read_uint8()]
+ result.append(sc.read_primitive(type))
count -= 1
return result
-
def write_list(self, l):
- sc = StringCodec(self.spec)
+ sc = StringCodec()
if l is not None:
sc.write_uint32(len(l))
for o in l:
- type = self.spec.encoding(o)
- sc.write_uint8(type.code)
- type.encode(sc, o)
+ type = self.encoding(o)
+ sc.write_uint8(type.CODE)
+ sc.write_primitive(type, o)
self.write_vbin32(sc.encoded)
- def read_list(self):
- sc = StringCodec(self.spec, self.read_vbin32())
- if not sc.encoded:
- return None
- count = sc.read_uint32()
- result = []
- while count > 0:
- type = self.spec.types[sc.read_uint8()]
- result.append(type.decode(sc))
- count -= 1
- return result
def read_struct32(self):
size = self.read_uint32()
code = self.read_uint16()
- type = self.spec.structs[code]
- fields = type.decode_fields(self)
- return Struct(type, **fields)
+ cls = COMPOUND[code]
+ op = cls()
+ self.read_fields(op)
+ return op
def write_struct32(self, value):
- sc = StringCodec(self.spec)
- sc.write_uint16(value._type.code)
- value._type.encode_fields(sc, value)
- self.write_vbin32(sc.encoded)
-
- def read_control(self):
- cntrl = self.spec.controls[self.read_uint16()]
- return Struct(cntrl, **cntrl.decode_fields(self))
- def write_control(self, ctrl):
- type = ctrl._type
- self.write_uint16(type.code)
- type.encode_fields(self, ctrl)
-
- def read_command(self):
- type = self.spec.commands[self.read_uint16()]
- hdr = self.spec["session.header"].decode(self)
- cmd = Struct(type, **type.decode_fields(self))
- return hdr, cmd
- def write_command(self, hdr, cmd):
- self.write_uint16(cmd._type.code)
- hdr._type.encode(self, hdr)
- cmd._type.encode_fields(self, cmd)
+ self.write_compound(value)
+
+ def read_compound(self, cls):
+ size = self.read_size(cls.SIZE)
+ if cls.CODE is not None:
+ code = self.read_uint16()
+ assert code == cls.CODE
+ op = cls()
+ self.read_fields(op)
+ return op
+ def write_compound(self, op):
+ sc = StringCodec()
+ if op.CODE is not None:
+ sc.write_uint16(op.CODE)
+ sc.write_fields(op)
+ self.write_size(op.SIZE, len(sc.encoded))
+ self.write(sc.encoded)
+
+ def read_fields(self, op):
+ flags = 0
+ for i in range(op.PACK):
+ flags |= (self.read_uint8() << 8*i)
+
+ for i in range(len(op.FIELDS)):
+ f = op.FIELDS[i]
+ if flags & (0x1 << i):
+ if COMPOUND.has_key(f.type):
+ value = self.read_compound(COMPOUND[f.type])
+ else:
+ value = getattr(self, "read_%s" % f.type)()
+ setattr(op, f.name, value)
+ def write_fields(self, op):
+ flags = 0
+ for i in range(len(op.FIELDS)):
+ f = op.FIELDS[i]
+ value = getattr(op, f.name)
+ if f.type == "bit":
+ present = value
+ else:
+ present = value != None
+ if present:
+ flags |= (0x1 << i)
+ for i in range(op.PACK):
+ self.write_uint8((flags >> 8*i) & 0xFF)
+ for i in range(len(op.FIELDS)):
+ f = op.FIELDS[i]
+ if flags & (0x1 << i):
+ if COMPOUND.has_key(f.type):
+ enc = self.write_compound
+ else:
+ enc = getattr(self, "write_%s" % f.type)
+ value = getattr(op, f.name)
+ enc(value)
def read_size(self, width):
if width > 0:
attr = "read_uint%d" % (width*8)
return getattr(self, attr)()
-
def write_size(self, width, n):
if width > 0:
attr = "write_uint%d" % (width*8)
@@ -285,7 +357,6 @@ class Codec(Packer):
def read_uuid(self):
return UUID(self.unpack("16s"))
-
def write_uuid(self, s):
if isinstance(s, UUID):
s = s.bytes
@@ -293,7 +364,6 @@ class Codec(Packer):
def read_bin128(self):
return self.unpack("16s")
-
def write_bin128(self, b):
self.pack("16s", b)
@@ -301,14 +371,13 @@ class Codec(Packer):
class StringCodec(Codec):
- def __init__(self, spec, encoded = ""):
- Codec.__init__(self, spec)
+ def __init__(self, encoded = ""):
self.encoded = encoded
- def write(self, s):
- self.encoded += s
-
def read(self, n):
result = self.encoded[:n]
self.encoded = self.encoded[n:]
return result
+
+ def write(self, s):
+ self.encoded += s
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 5abab3802c..680f8f62e3 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -20,14 +20,14 @@
import datatypes, session
from threading import Thread, Condition, RLock
from util import wait, notify
-from assembler import Assembler, Segment
from codec010 import StringCodec
+from framing import *
from session import Session
from generator import control_invoker
from spec import SPEC
from exceptions import *
from logging import getLogger
-import delegates
+import delegates, socket
class ChannelBusy(Exception): pass
@@ -43,12 +43,12 @@ def client(*args, **kwargs):
def server(*args, **kwargs):
return delegates.Server(*args, **kwargs)
-class Connection(Assembler):
+from framer import Framer
- def __init__(self, sock, spec=SPEC, delegate=client, **args):
- Assembler.__init__(self, sock)
- self.spec = spec
+class Connection(Framer):
+ def __init__(self, sock, delegate=client, **args):
+ Framer.__init__(self, sock)
self.lock = RLock()
self.attached = {}
self.sessions = {}
@@ -66,6 +66,10 @@ class Connection(Assembler):
self.channel_max = 65535
+ self.op_enc = OpEncoder()
+ self.seg_enc = SegmentEncoder()
+ self.frame_enc = FrameEncoder()
+
self.delegate = delegate(self, **args)
def attach(self, name, ch, delegate, force=False):
@@ -145,15 +149,44 @@ class Connection(Assembler):
raise ConnectionFailed(*self.close_code)
def run(self):
+ frame_dec = FrameDecoder()
+ seg_dec = SegmentDecoder()
+ op_dec = OpDecoder()
+
while not self.closed:
try:
- seg = self.read_segment()
- except Closed:
+ data = self.sock.recv(64*1024)
+ if not data:
+ self.detach_all()
+ break
+ except socket.timeout:
+ if self.aborted():
+ self.detach_all()
+ raise Closed("connection timed out")
+ else:
+ continue
+ except socket.error, e:
self.detach_all()
- break
- self.delegate.received(seg)
+ raise Closed(e)
+ frame_dec.write(data)
+ seg_dec.write(*frame_dec.read())
+ op_dec.write(*seg_dec.read())
+ for op in op_dec.read():
+ self.delegate.received(op)
self.sock.close()
+ def write_op(self, op):
+ self.sock_lock.acquire()
+ try:
+ self.op_enc.write(op)
+ self.seg_enc.write(*self.op_enc.read())
+ self.frame_enc.write(*self.seg_enc.read())
+ bytes = self.frame_enc.read()
+ self.write(bytes)
+ self.flush()
+ finally:
+ self.sock_lock.release()
+
def close(self, timeout=None):
if not self.opened: return
Channel(self, 0).connection_close(200)
@@ -169,19 +202,17 @@ class Connection(Assembler):
log = getLogger("qpid.io.ctl")
-class Channel(control_invoker(SPEC)):
+class Channel(control_invoker()):
def __init__(self, connection, id):
self.connection = connection
self.id = id
self.session = None
- def invoke(self, type, args, kwargs):
- ctl = type.new(args, kwargs)
- sc = StringCodec(self.spec)
- sc.write_control(ctl)
- self.connection.write_segment(Segment(True, True, type.segment_type,
- type.track, self.id, sc.encoded))
+ def invoke(self, op, args, kwargs):
+ ctl = op(*args, **kwargs)
+ ctl.channel = self.id
+ self.connection.write_op(ctl)
log.debug("SENT %s", ctl)
def __str__(self):
diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py
index be94a792cb..d34cfe2847 100644
--- a/python/qpid/connection08.py
+++ b/python/qpid/connection08.py
@@ -28,6 +28,7 @@ from cStringIO import StringIO
from spec import load
from codec import EOF
from compat import SHUT_RDWR
+from exceptions import VersionError
class SockIO:
@@ -73,6 +74,9 @@ def listen(host, port, predicate = lambda: True):
s, a = sock.accept()
yield SockIO(s)
+class FramingError(Exception):
+ pass
+
class Connection:
def __init__(self, io, spec):
@@ -107,7 +111,16 @@ class Connection:
def read_8_0(self):
c = self.codec
- type = self.spec.constants.byid[c.decode_octet()].name
+ tid = c.decode_octet()
+ try:
+ type = self.spec.constants.byid[tid].name
+ except KeyError:
+ if tid == ord('A') and c.unpack("!3s") == "MQP":
+ _, _, major, minor = c.unpack("4B")
+ raise VersionError("client: %s-%s, server: %s-%s" %
+ (self.spec.major, self.spec.minor, major, minor))
+ else:
+ raise FramingError("unknown frame type: %s" % tid)
channel = c.decode_short()
body = c.decode_longstr()
dec = codec.Codec(StringIO(body), self.spec)
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py
index 4cd3fade2c..bba3f5b9ab 100644
--- a/python/qpid/datatypes.py
+++ b/python/qpid/datatypes.py
@@ -84,7 +84,7 @@ class Message:
def get(self, name):
if self.headers:
for h in self.headers:
- if h._type.name == name:
+ if h.NAME == name:
return h
return None
@@ -93,7 +93,7 @@ class Message:
self.headers = []
idx = 0
while idx < len(self.headers):
- if self.headers[idx]._type == header._type:
+ if self.headers[idx].NAME == header.NAME:
self.headers[idx] = header
return
idx += 1
@@ -102,7 +102,7 @@ class Message:
def clear(self, name):
idx = 0
while idx < len(self.headers):
- if self.headers[idx]._type.name == name:
+ if self.headers[idx].NAME == name:
del self.headers[idx]
return
idx += 1
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 82bbe67ede..c74cc5a945 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -20,7 +20,9 @@
import os, connection, session
from util import notify
from datatypes import RangedSet
+from exceptions import VersionError
from logging import getLogger
+from ops import Control
import sys
log = getLogger("qpid.io.ctl")
@@ -29,26 +31,22 @@ class Delegate:
def __init__(self, connection, delegate=session.client):
self.connection = connection
- self.spec = connection.spec
self.delegate = delegate
- self.control = self.spec["track.control"].value
- def received(self, seg):
- ssn = self.connection.attached.get(seg.channel)
+ def received(self, op):
+ ssn = self.connection.attached.get(op.channel)
if ssn is None:
- ch = connection.Channel(self.connection, seg.channel)
+ ch = connection.Channel(self.connection, op.channel)
else:
ch = ssn.channel
- if seg.track == self.control:
- ctl = seg.decode(self.spec)
- log.debug("RECV %s", ctl)
- attr = ctl._type.qname.replace(".", "_")
- getattr(self, attr)(ch, ctl)
+ if isinstance(op, Control):
+ log.debug("RECV %s", op)
+ getattr(self, op.NAME)(ch, op)
elif ssn is None:
ch.session_detached()
else:
- ssn.received(seg)
+ ssn.received(op)
def connection_close(self, ch, close):
self.connection.close_code = (close.reply_code, close.reply_text)
@@ -124,7 +122,8 @@ class Server(Delegate):
def start(self):
self.connection.read_header()
- self.connection.write_header(self.spec.major, self.spec.minor)
+ # XXX
+ self.connection.write_header(0, 10)
connection.Channel(self.connection, 0).connection_start(mechanisms=["ANONYMOUS"])
def connection_start_ok(self, ch, start_ok):
@@ -156,8 +155,14 @@ class Client(Delegate):
self.heartbeat = heartbeat
def start(self):
- self.connection.write_header(self.spec.major, self.spec.minor)
- self.connection.read_header()
+ # XXX
+ cli_major = 0
+ cli_minor = 10
+ self.connection.write_header(cli_major, cli_minor)
+ magic, _, _, major, minor = self.connection.read_header()
+ if not (magic == "AMQP" and major == cli_major and minor == cli_minor):
+ raise VersionError("client: %s-%s, server: %s-%s" %
+ (cli_major, cli_minor, major, minor))
def connection_start(self, ch, start):
r = "\0%s\0%s" % (self.username, self.password)
diff --git a/python/qpid/exceptions.py b/python/qpid/exceptions.py
index 7eaaf81ed4..2bd80b7ffe 100644
--- a/python/qpid/exceptions.py
+++ b/python/qpid/exceptions.py
@@ -19,3 +19,4 @@
class Closed(Exception): pass
class Timeout(Exception): pass
+class VersionError(Exception): pass
diff --git a/python/qpid/framer.py b/python/qpid/framer.py
index 0d82e4378b..4cd0ae6f26 100644
--- a/python/qpid/framer.py
+++ b/python/qpid/framer.py
@@ -26,48 +26,6 @@ from logging import getLogger
raw = getLogger("qpid.io.raw")
frm = getLogger("qpid.io.frm")
-FIRST_SEG = 0x08
-LAST_SEG = 0x04
-FIRST_FRM = 0x02
-LAST_FRM = 0x01
-
-class Frame:
-
- HEADER = "!2BHxBH4x"
- HEADER_SIZE = struct.calcsize(HEADER)
- MAX_PAYLOAD = 65535 - struct.calcsize(HEADER)
-
- def __init__(self, flags, type, track, channel, payload):
- if len(payload) > Frame.MAX_PAYLOAD:
- raise ValueError("max payload size exceeded: %s" % len(payload))
- self.flags = flags
- self.type = type
- self.track = track
- self.channel = channel
- self.payload = payload
-
- def isFirstSegment(self):
- return bool(FIRST_SEG & self.flags)
-
- def isLastSegment(self):
- return bool(LAST_SEG & self.flags)
-
- def isFirstFrame(self):
- return bool(FIRST_FRM & self.flags)
-
- def isLastFrame(self):
- return bool(LAST_FRM & self.flags)
-
- def __repr__(self):
- return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()),
- int(self.isLastSegment()),
- int(self.isFirstFrame()),
- int(self.isLastFrame()),
- self.type,
- self.track,
- self.channel,
- self.payload)
-
class FramingError(Exception): pass
class Framer(Packer):
@@ -137,24 +95,3 @@ class Framer(Packer):
self.flush()
finally:
self.sock_lock.release()
-
- def write_frame(self, frame):
- self.sock_lock.acquire()
- try:
- size = len(frame.payload) + struct.calcsize(Frame.HEADER)
- track = frame.track & 0x0F
- self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
- self.write(frame.payload)
- if frame.isLastSegment() and frame.isLastFrame():
- self.flush()
- frm.debug("SENT %s", frame)
- finally:
- self.sock_lock.release()
-
- def read_frame(self):
- flags, type, size, track, channel = self.unpack(Frame.HEADER)
- if flags & 0xF0: raise FramingError()
- payload = self.read(size - struct.calcsize(Frame.HEADER))
- frame = Frame(flags, type, track, channel, payload)
- frm.debug("RECV %s", frame)
- return frame
diff --git a/python/qpid/framing.py b/python/qpid/framing.py
index 7c5f68fbcc..0a8f26272c 100644
--- a/python/qpid/framing.py
+++ b/python/qpid/framing.py
@@ -18,8 +18,64 @@
#
import struct
-from qpid.framer import Frame, FIRST_SEG, LAST_SEG, FIRST_FRM, LAST_FRM
-from qpid.assembler import Segment
+
+FIRST_SEG = 0x08
+LAST_SEG = 0x04
+FIRST_FRM = 0x02
+LAST_FRM = 0x01
+
+class Frame:
+
+ HEADER = "!2BHxBH4x"
+ HEADER_SIZE = struct.calcsize(HEADER)
+ MAX_PAYLOAD = 65535 - struct.calcsize(HEADER)
+
+ def __init__(self, flags, type, track, channel, payload):
+ if len(payload) > Frame.MAX_PAYLOAD:
+ raise ValueError("max payload size exceeded: %s" % len(payload))
+ self.flags = flags
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def isFirstSegment(self):
+ return bool(FIRST_SEG & self.flags)
+
+ def isLastSegment(self):
+ return bool(LAST_SEG & self.flags)
+
+ def isFirstFrame(self):
+ return bool(FIRST_FRM & self.flags)
+
+ def isLastFrame(self):
+ return bool(LAST_FRM & self.flags)
+
+ def __repr__(self):
+ return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()),
+ int(self.isLastSegment()),
+ int(self.isFirstFrame()),
+ int(self.isLastFrame()),
+ self.type,
+ self.track,
+ self.channel,
+ self.payload)
+
+class Segment:
+
+ def __init__(self, first, last, type, track, channel, payload):
+ self.id = None
+ self.offset = None
+ self.first = first
+ self.last = last
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def __repr__(self):
+ return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
+ self.track, self.channel, self.payload)
class FrameDecoder:
@@ -140,3 +196,115 @@ class SegmentEncoder:
result = self.frames
self.frames = []
return result
+
+from ops import COMMANDS, CONTROLS, COMPOUND, Header, segment_type, track
+from spec import SPEC
+
+from codec010 import StringCodec
+
+class OpEncoder:
+
+ def __init__(self):
+ self.segments = []
+
+ def write(self, *ops):
+ for op in ops:
+ if COMMANDS.has_key(op.NAME):
+ seg_type = segment_type.command
+ seg_track = track.command
+ enc = self.encode_command(op)
+ elif CONTROLS.has_key(op.NAME):
+ seg_type = segment_type.control
+ seg_track = track.control
+ enc = self.encode_compound(op)
+ else:
+ raise ValueError(op)
+ seg = Segment(True, False, seg_type, seg_track, op.channel, enc)
+ self.segments.append(seg)
+ if hasattr(op, "headers") and op.headers is not None:
+ hdrs = ""
+ for h in op.headers:
+ hdrs += self.encode_compound(h)
+ seg = Segment(False, False, segment_type.header, seg_track, op.channel,
+ hdrs)
+ self.segments.append(seg)
+ if hasattr(op, "payload") and op.payload is not None:
+ self.segments.append(Segment(False, False, segment_type.body, seg_track,
+ op.channel, op.payload))
+ self.segments[-1].last = True
+
+ def encode_command(self, cmd):
+ sc = StringCodec()
+ sc.write_uint16(cmd.CODE)
+ sc.write_compound(Header(sync=cmd.sync))
+ sc.write_fields(cmd)
+ return sc.encoded
+
+ def encode_compound(self, op):
+ sc = StringCodec()
+ sc.write_compound(op)
+ return sc.encoded
+
+ def read(self):
+ result = self.segments
+ self.segments = []
+ return result
+
+class OpDecoder:
+
+ def __init__(self):
+ self.op = None
+ self.ops = []
+
+ def write(self, *segments):
+ for seg in segments:
+ if seg.first:
+ if seg.type == segment_type.command:
+ self.op = self.decode_command(seg.payload)
+ elif seg.type == segment_type.control:
+ self.op = self.decode_control(seg.payload)
+ else:
+ raise ValueError(seg)
+ self.op.channel = seg.channel
+ elif seg.type == segment_type.header:
+ if self.op.headers is None:
+ self.op.headers = []
+ self.op.headers.extend(self.decode_headers(seg.payload))
+ elif seg.type == segment_type.body:
+ if self.op.payload is None:
+ self.op.payload = seg.payload
+ else:
+ self.op.payload += seg.payload
+ if seg.last:
+ self.ops.append(self.op)
+ self.op = None
+
+ def decode_command(self, encoded):
+ sc = StringCodec(encoded)
+ code = sc.read_uint16()
+ cls = COMMANDS[code]
+ hdr = sc.read_compound(Header)
+ cmd = cls()
+ sc.read_fields(cmd)
+ cmd.sync = hdr.sync
+ return cmd
+
+ def decode_control(self, encoded):
+ sc = StringCodec(encoded)
+ code = sc.read_uint16()
+ cls = CONTROLS[code]
+ ctl = cls()
+ sc.read_fields(ctl)
+ return ctl
+
+ def decode_headers(self, encoded):
+ sc = StringCodec(encoded)
+ result = []
+ while sc.encoded:
+ result.append(sc.read_struct32())
+ return result
+
+ def read(self):
+ result = self.ops
+ self.ops = []
+ return result
diff --git a/python/qpid/generator.py b/python/qpid/generator.py
index 729425d6a3..02d11e5005 100644
--- a/python/qpid/generator.py
+++ b/python/qpid/generator.py
@@ -19,42 +19,38 @@
import sys
-from spec010 import Control
+from ops import *
-def METHOD(module, inst):
- method = lambda self, *args, **kwargs: self.invoke(inst, args, kwargs)
+def METHOD(module, op):
+ method = lambda self, *args, **kwargs: self.invoke(op, args, kwargs)
if sys.version_info[:2] > (2, 3):
- method.__name__ = str(inst.pyname)
- method.__doc__ = str(inst.pydoc)
+ method.__name__ = op.__name__
+ method.__doc__ = op.__doc__
method.__module__ = module
return method
-def generate(spec, module, predicate=lambda x: True):
- dict = {"spec": spec}
+def generate(module, operations):
+ dict = {}
- for name, enum in spec.enums.items():
- dict[name] = enum
+ for name, enum in ENUMS.items():
+ if isinstance(name, basestring):
+ dict[name] = enum
- for name, st in spec.structs_by_name.items():
- dict[name] = METHOD(module, st)
+ for name, op in COMPOUND.items():
+ if isinstance(name, basestring):
+ dict[name] = METHOD(module, op)
- for st in spec.structs.values():
- dict[st.name] = METHOD(module, st)
-
- for name, inst in spec.instructions.items():
- if predicate(inst):
- dict[name] = METHOD(module, inst)
+ for name, op in operations.items():
+ if isinstance(name, basestring):
+ dict[name] = METHOD(module, op)
return dict
-def invoker(name, spec, predicate=lambda x: True):
- return type("%s_%s_%s" % (name, spec.major, spec.minor),
- (), generate(spec, invoker.__module__, predicate))
+def invoker(name, operations):
+ return type(name, (), generate(invoker.__module__, operations))
-def command_invoker(spec):
- is_command = lambda cmd: cmd.track == spec["track.command"].value
- return invoker("CommandInvoker", spec, is_command)
+def command_invoker():
+ return invoker("CommandInvoker", COMMANDS)
-def control_invoker(spec):
- is_control = lambda inst: isinstance(inst, Control)
- return invoker("ControlInvoker", spec, is_control)
+def control_invoker():
+ return invoker("ControlInvoker", CONTROLS)
diff --git a/python/qpid/harness.py b/python/qpid/harness.py
new file mode 100644
index 0000000000..ce48481612
--- /dev/null
+++ b/python/qpid/harness.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Skipped(Exception): pass
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index f06ef87709..9b3fecbf9b 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -34,8 +34,8 @@ import connection, time, socket, sys, traceback
from codec010 import StringCodec
from datatypes import timestamp, uuid4, RangedSet, Message as Message010
from logging import getLogger
+from ops import PRIMITIVE
from session import Client, INCOMPLETE
-from spec import SPEC
from threading import Thread, RLock, Condition
from util import connect
@@ -191,9 +191,12 @@ class Connection(Lockable):
try:
self._socket = connect(self.host, self.port)
except socket.error, e:
- raise ConnectError(*e.args)
+ raise ConnectError(e)
self._conn = connection.Connection(self._socket)
- self._conn.start()
+ try:
+ self._conn.start()
+ except connection.VersionError, e:
+ raise ConnectError(e)
for ssn in self.sessions.values():
ssn._attach()
@@ -263,8 +266,8 @@ FILTER_DEFAULTS = {
def delegate(session):
class Delegate(Client):
- def message_transfer(self, cmd, headers, body):
- session._message_transfer(cmd, headers, body)
+ def message_transfer(self, cmd):
+ session._message_transfer(cmd)
return Delegate
class Session(Lockable):
@@ -314,9 +317,9 @@ class Session(Lockable):
link._disconnected()
@synchronized
- def _message_transfer(self, cmd, headers, body):
- m = Message010(body)
- m.headers = headers
+ def _message_transfer(self, cmd):
+ m = Message010(cmd.payload)
+ m.headers = cmd.headers
m.id = cmd.id
msg = self._decode(m)
rcv = self.receivers[int(cmd.destination)]
@@ -812,16 +815,16 @@ class Receiver(Lockable):
def codec(name):
- type = SPEC.named[name]
+ type = PRIMITIVE[name]
def encode(x):
- sc = StringCodec(SPEC)
- type.encode(sc, x)
+ sc = StringCodec()
+ sc.write_primitive(type, x)
return sc.encoded
def decode(x):
- sc = StringCodec(SPEC, x)
- return type.decode(sc)
+ sc = StringCodec(x)
+ return sc.read_primitive(type)
return encode, decode
diff --git a/python/qpid/ops.py b/python/qpid/ops.py
new file mode 100644
index 0000000000..1f82889164
--- /dev/null
+++ b/python/qpid/ops.py
@@ -0,0 +1,277 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import os, mllib, cPickle as pickle
+from util import fill
+
+class Primitive(object):
+ pass
+
+class Enum(object):
+ pass
+
+class Field:
+
+ def __init__(self, name, type, default=None):
+ self.name = name
+ self.type = type
+ self.default = default
+
+ def __repr__(self):
+ return "%s: %s" % (self.name, self.type)
+
+class Compound(object):
+
+ UNENCODED=[]
+
+ def __init__(self, *args, **kwargs):
+ args = list(args)
+ for f in self.ARGS:
+ if args:
+ a = args.pop(0)
+ else:
+ a = kwargs.pop(f.name, f.default)
+ setattr(self, f.name, a)
+ if args:
+ raise TypeError("%s takes at most %s arguments (%s given))" %
+ (self.__class__.__name__, len(self.ARGS),
+ len(self.ARGS) + len(args)))
+ if kwargs:
+ raise TypeError("got unexpected keyword argument '%s'" % kwargs.keys()[0])
+
+ def fields(self):
+ result = {}
+ for f in self.FIELDS:
+ result[f.name] = getattr(self, f.name)
+ return result
+
+ def args(self):
+ result = {}
+ for f in self.ARGS:
+ result[f.name] = getattr(self, f.name)
+ return result
+
+ def dispatch(self, target, *args):
+ handler = "do_%s" % self.NAME
+ if hasattr(target, handler):
+ getattr(target, handler)(self, *args)
+ else:
+ print "UNHANDLED:", target, args
+
+ def __repr__(self, extras=()):
+ return "%s(%s)" % (self.__class__.__name__,
+ ", ".join(["%s=%r" % (f.name, getattr(self, f.name))
+ for f in self.ARGS
+ if getattr(self, f.name) is not f.default]))
+
+class Command(Compound):
+ UNENCODED=[Field("channel", "uint16", 0),
+ Field("id", "sequence-no", None),
+ Field("sync", "bit", False),
+ Field("headers", None, None),
+ Field("payload", None, None)]
+
+class Control(Compound):
+ UNENCODED=[Field("channel", "uint16", 0)]
+
+def pythonize(st):
+ if st is None:
+ return None
+ else:
+ return str(st.replace("-", "_"))
+
+def pydoc(op, children=()):
+ doc = "\n\n".join([fill(p.text(), 0) for p in op.query["doc"]])
+ for ch in children:
+ doc += "\n\n " + pythonize(ch["@name"]) + " -- " + str(ch["@label"])
+ ch_descs ="\n\n".join([fill(p.text(), 4) for p in ch.query["doc"]])
+ if ch_descs:
+ doc += "\n\n" + ch_descs
+ return doc
+
+def studly(st):
+ return "".join([p.capitalize() for p in st.split("-")])
+
+def klass(nd):
+ while nd.parent is not None:
+ if hasattr(nd.parent, "name") and nd.parent.name == "class":
+ return nd.parent
+ else:
+ nd = nd.parent
+
+def included(nd):
+ cls = klass(nd)
+ if cls is None:
+ return True
+ else:
+ return cls["@name"] not in ("file", "stream")
+
+def num(s):
+ if s: return int(s, 0)
+
+def code(nd):
+ c = num(nd["@code"])
+ if c is None:
+ return None
+ else:
+ cls = klass(nd)
+ if cls is None:
+ return c
+ else:
+ return c | (num(cls["@code"]) << 8)
+
+def default(f):
+ if f["@type"] == "bit":
+ return False
+ else:
+ return None
+
+def make_compound(decl, base):
+ dict = {}
+ fields = decl.query["field"]
+ dict["__doc__"] = pydoc(decl, fields)
+ dict["NAME"] = pythonize(decl["@name"])
+ dict["SIZE"] = num(decl["@size"])
+ dict["CODE"] = code(decl)
+ dict["PACK"] = num(decl["@pack"])
+ dict["FIELDS"] = [Field(pythonize(f["@name"]), resolve(f), default(f)) for f in fields]
+ dict["ARGS"] = dict["FIELDS"] + base.UNENCODED
+ return str(studly(decl["@name"])), (base,), dict
+
+def make_restricted(decl):
+ name = pythonize(decl["@name"])
+ dict = {}
+ choices = decl.query["choice"]
+ dict["__doc__"] = pydoc(decl, choices)
+ dict["NAME"] = name
+ dict["TYPE"] = str(decl.parent["@type"])
+ values = []
+ for ch in choices:
+ val = int(ch["@value"], 0)
+ dict[pythonize(ch["@name"])] = val
+ values.append(val)
+ dict["VALUES"] = values
+ return name, (Enum,), dict
+
+def make_type(decl):
+ name = pythonize(decl["@name"])
+ dict = {}
+ dict["__doc__"] = pydoc(decl)
+ dict["NAME"] = name
+ dict["CODE"] = code(decl)
+ return str(studly(decl["@name"])), (Primitive,), dict
+
+def make_command(decl):
+ decl.set_attr("name", "%s-%s" % (decl.parent["@name"], decl["@name"]))
+ decl.set_attr("size", "0")
+ decl.set_attr("pack", "2")
+ name, bases, dict = make_compound(decl, Command)
+ dict["RESULT"] = pythonize(decl["result/@type"]) or pythonize(decl["result/struct/@name"])
+ return name, bases, dict
+
+def make_control(decl):
+ decl.set_attr("name", "%s-%s" % (decl.parent["@name"], decl["@name"]))
+ decl.set_attr("size", "0")
+ decl.set_attr("pack", "2")
+ return make_compound(decl, Control)
+
+def make_struct(decl):
+ return make_compound(decl, Compound)
+
+def make_enum(decl):
+ decl.set_attr("name", decl.parent["@name"])
+ return make_restricted(decl)
+
+
+vars = globals()
+
+def make(nd):
+ return vars["make_%s" % nd.name](nd)
+
+from qpid_config import amqp_spec as file
+pclfile = "%s.ops.pcl" % file
+
+if False and (os.path.exists(pclfile) and
+ os.path.getmtime(pclfile) > os.path.getmtime(file)):
+ f = open(pclfile, "read")
+ types = pickle.load(f)
+ f.close()
+else:
+ spec = mllib.xml_parse(file)
+
+ def qualify(nd, field="@name"):
+ cls = klass(nd)
+ if cls is None:
+ return pythonize(nd[field])
+ else:
+ return pythonize("%s.%s" % (cls["@name"], nd[field]))
+
+ domains = dict([(qualify(d), pythonize(d["@type"]))
+ for d in spec.query["amqp/domain", included] + \
+ spec.query["amqp/class/domain", included]])
+
+ def resolve(nd):
+ candidates = qualify(nd, "@type"), pythonize(nd["@type"])
+ for c in candidates:
+ if domains.has_key(c):
+ while domains.has_key(c):
+ c = domains[c]
+ return c
+ else:
+ return c
+
+ type_decls = \
+ spec.query["amqp/class/command", included] + \
+ spec.query["amqp/class/control", included] + \
+ spec.query["amqp/class/command/result/struct", included] + \
+ spec.query["amqp/class/struct", included] + \
+ spec.query["amqp/class/domain/enum", included] + \
+ spec.query["amqp/domain/enum", included] + \
+ spec.query["amqp/type"]
+ types = [make(nd) for nd in type_decls]
+
+ if os.access(os.path.dirname(os.path.abspath(pclfile)), os.W_OK):
+ f = open(pclfile, "write")
+ pickle.dump(types, f)
+ f.close()
+
+ENUMS = {}
+PRIMITIVE = {}
+COMPOUND = {}
+COMMANDS = {}
+CONTROLS = {}
+
+for name, bases, dict in types:
+ t = type(name, bases, dict)
+ vars[name] = t
+
+ if issubclass(t, Command):
+ COMMANDS[t.NAME] = t
+ COMMANDS[t.CODE] = t
+ elif issubclass(t, Control):
+ CONTROLS[t.NAME] = t
+ CONTROLS[t.CODE] = t
+ elif issubclass(t, Compound):
+ COMPOUND[t.NAME] = t
+ if t.CODE is not None:
+ COMPOUND[t.CODE] = t
+ elif issubclass(t, Primitive):
+ PRIMITIVE[t.NAME] = t
+ PRIMITIVE[t.CODE] = t
+ elif issubclass(t, Enum):
+ ENUMS[t.NAME] = t
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 18d7848b8d..2bc9844351 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -25,7 +25,7 @@ incoming method frames to a delegate.
"""
import thread, threading, traceback, socket, sys, logging
-from connection08 import EOF, Method, Header, Body, Request, Response
+from connection08 import EOF, Method, Header, Body, Request, Response, VersionError
from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
@@ -95,6 +95,8 @@ class Peer:
break
ch = self.channel(frame.channel)
ch.receive(frame, self.work)
+ except VersionError, e:
+ self.closed(e)
except:
self.fatal()
diff --git a/python/qpid/session.py b/python/qpid/session.py
index 3b8bd18469..4413a22899 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -22,9 +22,9 @@ from spec import SPEC
from generator import command_invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
-from assembler import Segment
from queue import Queue
from datatypes import Message, serial
+from ops import Command, MessageTransfer
from util import wait, notify
from exceptions import *
from logging import getLogger
@@ -44,7 +44,7 @@ def server(*args):
INCOMPLETE = object()
-class Session(command_invoker(SPEC)):
+class Session(command_invoker()):
def __init__(self, name, auto_sync=True, timeout=10, delegate=client):
self.name = name
@@ -67,8 +67,6 @@ class Session(command_invoker(SPEC)):
self.results = {}
self.exceptions = []
- self.assembly = None
-
self.delegate = delegate(self)
def incoming(self, destination):
@@ -134,68 +132,51 @@ class Session(command_invoker(SPEC)):
finally:
self.lock.release()
- def invoke(self, type, args, kwargs):
- # XXX
- if not hasattr(type, "track"):
- return type.new(args, kwargs)
-
- self.invoke_lock.acquire()
- try:
- return self.do_invoke(type, args, kwargs)
- finally:
- self.invoke_lock.release()
+ def invoke(self, op, args, kwargs):
+ if issubclass(op, Command):
+ self.invoke_lock.acquire()
+ try:
+ return self.do_invoke(op, args, kwargs)
+ finally:
+ self.invoke_lock.release()
+ else:
+ return op(*args, **kwargs)
- def do_invoke(self, type, args, kwargs):
+ def do_invoke(self, op, args, kwargs):
if self._closing:
raise SessionClosed()
if self.channel == None:
raise SessionDetached()
- if type.segments:
- if len(args) == len(type.fields) + 1:
+ if op == MessageTransfer:
+ if len(args) == len(op.FIELDS) + 1:
message = args[-1]
args = args[:-1]
else:
message = kwargs.pop("message", None)
- else:
- message = None
-
- hdr = Struct(self.spec["session.header"])
- hdr.sync = self.auto_sync or kwargs.pop("sync", False)
- self.need_sync = not hdr.sync
+ if message is not None:
+ kwargs["headers"] = message.headers
+ kwargs["payload"] = message.body
- cmd = type.new(args, kwargs)
- sc = StringCodec(self.spec)
- sc.write_command(hdr, cmd)
+ cmd = op(*args, **kwargs)
+ cmd.sync = self.auto_sync or cmd.sync
+ self.need_sync = not cmd.sync
+ cmd.channel = self.channel.id
- seg = Segment(True, (message == None or
- (message.headers == None and message.body == None)),
- type.segment_type, type.track, self.channel.id, sc.encoded)
-
- if type.result:
+ if op.RESULT:
result = Future(exception=SessionException)
self.results[self.sender.next_id] = result
- self.send(seg)
-
- log.debug("SENT %s %s %s", seg.id, hdr, cmd)
-
- if message != None:
- if message.headers != None:
- sc = StringCodec(self.spec)
- for st in message.headers:
- sc.write_struct32(st)
- seg = Segment(False, message.body == None, self.spec["segment_type.header"].value,
- type.track, self.channel.id, sc.encoded)
- self.send(seg)
- if message.body != None:
- seg = Segment(False, True, self.spec["segment_type.body"].value,
- type.track, self.channel.id, message.body)
- self.send(seg)
- msg.debug("SENT %s", message)
-
- if type.result:
+ log.debug("SENDING %s", cmd)
+
+ self.send(cmd)
+
+ log.debug("SENT %s", cmd)
+ if op == MessageTransfer:
+ msg.debug("SENT %s", cmd)
+
+ if op.RESULT:
if self.auto_sync:
return result.get(self.timeout)
else:
@@ -203,81 +184,47 @@ class Session(command_invoker(SPEC)):
elif self.auto_sync:
self.sync(self.timeout)
- def received(self, seg):
- self.receiver.received(seg)
- if seg.first:
- assert self.assembly == None
- self.assembly = []
- self.assembly.append(seg)
- if seg.last:
- self.dispatch(self.assembly)
- self.assembly = None
-
- def dispatch(self, assembly):
- segments = assembly[:]
-
- hdr, cmd = assembly.pop(0).decode(self.spec)
- log.debug("RECV %s %s %s", cmd.id, hdr, cmd)
-
- args = []
-
- for st in cmd._type.segments:
- if assembly:
- seg = assembly[0]
- if seg.type == st.segment_type:
- args.append(seg.decode(self.spec))
- assembly.pop(0)
- continue
- args.append(None)
-
- assert len(assembly) == 0
+ def received(self, cmd):
+ self.receiver.received(cmd)
+ self.dispatch(cmd)
- attr = cmd._type.qname.replace(".", "_")
- result = getattr(self.delegate, attr)(cmd, *args)
+ def dispatch(self, cmd):
+ log.debug("RECV %s", cmd)
- if cmd._type.result:
+ result = getattr(self.delegate, cmd.NAME)(cmd)
+ if result is INCOMPLETE:
+ return
+ elif result is not None:
self.execution_result(cmd.id, result)
- if result is not INCOMPLETE:
- for seg in segments:
- self.receiver.completed(seg)
- # XXX: don't forget to obey sync for manual completion as well
- if hdr.sync:
- self.channel.session_completed(self.receiver._completed)
+ self.receiver.completed(cmd)
+ # XXX: don't forget to obey sync for manual completion as well
+ if cmd.sync:
+ self.channel.session_completed(self.receiver._completed)
- def send(self, seg):
- self.sender.send(seg)
-
- def __str__(self):
- return '<Session: %s, %s>' % (self.name, self.channel)
+ def send(self, cmd):
+ self.sender.send(cmd)
def __repr__(self):
- return str(self)
+ return '<Session: %s, %s>' % (self.name, self.channel)
class Receiver:
def __init__(self, session):
self.session = session
self.next_id = None
- self.next_offset = None
self._completed = RangedSet()
- def received(self, seg):
- if self.next_id == None or self.next_offset == None:
+ def received(self, cmd):
+ if self.next_id == None:
raise Exception("todo")
- seg.id = self.next_id
- seg.offset = self.next_offset
- if seg.last:
- self.next_id += 1
- self.next_offset = 0
- else:
- self.next_offset += len(seg.payload)
+ cmd.id = self.next_id
+ self.next_id += 1
- def completed(self, seg):
- if seg.id == None:
- raise ValueError("cannot complete unidentified segment")
- if seg.last:
- self._completed.add(seg.id)
+ def completed(self, cmd):
+ if cmd.id == None:
+ raise ValueError("cannot complete unidentified command")
+ self._completed.add(cmd.id)
def known_completed(self, commands):
completed = RangedSet()
@@ -294,30 +241,24 @@ class Sender:
def __init__(self, session):
self.session = session
self.next_id = serial(0)
- self.next_offset = 0
- self.segments = []
+ self.commands = []
self._completed = RangedSet()
- def send(self, seg):
- seg.id = self.next_id
- seg.offset = self.next_offset
- if seg.last:
- self.next_id += 1
- self.next_offset = 0
- else:
- self.next_offset += len(seg.payload)
- self.segments.append(seg)
+ def send(self, cmd):
+ cmd.id = self.next_id
+ self.next_id += 1
if self.session.send_id:
self.session.send_id = False
- self.session.channel.session_command_point(seg.id, seg.offset)
- self.session.channel.connection.write_segment(seg)
+ self.session.channel.session_command_point(cmd.id, 0)
+ self.commands.append(cmd)
+ self.session.channel.connection.write_op(cmd)
def completed(self, commands):
idx = 0
- while idx < len(self.segments):
- seg = self.segments[idx]
- if seg.id in commands:
- del self.segments[idx]
+ while idx < len(self.commands):
+ cmd = self.commands[idx]
+ if cmd.id in commands:
+ del self.commands[idx]
else:
idx += 1
for range in commands.ranges:
@@ -332,7 +273,7 @@ class Incoming(Queue):
def start(self):
self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit)
- for unit in self.session.credit_unit.values():
+ for unit in self.session.credit_unit.VALUES:
self.session.message_flow(self.destination, unit, 0xFFFFFFFFL)
def stop(self):
@@ -356,9 +297,9 @@ class Delegate:
class Client(Delegate):
- def message_transfer(self, cmd, headers, body):
- m = Message(body)
- m.headers = headers
+ def message_transfer(self, cmd):
+ m = Message(cmd.payload)
+ m.headers = cmd.headers
m.id = cmd.id
messages = self.session.incoming(cmd.destination)
messages.put(m)
diff --git a/python/qpid/spec.py b/python/qpid/spec.py
index cd76c70c5c..e9bfef1fa6 100644
--- a/python/qpid/spec.py
+++ b/python/qpid/spec.py
@@ -29,7 +29,7 @@ class so that the generated code can be reused in a variety of
situations.
"""
-import os, mllib, spec08, spec010
+import os, mllib, spec08
def default():
try:
@@ -54,7 +54,7 @@ def load(specfile, *errata):
minor = doc["amqp/@minor"]
if major == "0" and minor == "10":
- return spec010.load(specfile, *errata)
+ return None
else:
return spec08.load(specfile, *errata)
diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py
deleted file mode 100644
index eabc8e2983..0000000000
--- a/python/qpid/spec010.py
+++ /dev/null
@@ -1,708 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, cPickle, datatypes, datetime
-from codec010 import StringCodec
-from util import mtime, fill
-
-class Node:
-
- def __init__(self, children):
- self.children = children
- self.named = {}
- self.docs = []
- self.rules = []
-
- def register(self):
- for ch in self.children:
- ch.register(self)
-
- def resolve(self):
- for ch in self.children:
- ch.resolve()
-
- def __getitem__(self, name):
- path = name.split(".", 1)
- nd = self.named
- for step in path:
- nd = nd[step]
- return nd
-
- def __iter__(self):
- return iter(self.children)
-
-class Anonymous:
-
- def __init__(self, children):
- self.children = children
-
- def register(self, node):
- for ch in self.children:
- ch.register(node)
-
- def resolve(self):
- for ch in self.children:
- ch.resolve()
-
-class Named:
-
- def __init__(self, name):
- self.name = name
- self.qname = None
-
- def register(self, node):
- self.spec = node.spec
- self.klass = node.klass
- node.named[self.name] = self
- if node.qname:
- self.qname = "%s.%s" % (node.qname, self.name)
- else:
- self.qname = self.name
-
- def __str__(self):
- return self.qname
-
- def __repr__(self):
- return str(self)
-
-class Lookup:
-
- def lookup(self, name):
- value = None
- if self.klass:
- try:
- value = self.klass[name]
- except KeyError:
- pass
- if not value:
- value = self.spec[name]
- return value
-
-class Coded:
-
- def __init__(self, code):
- self.code = code
-
-class Constant(Named, Node):
-
- def __init__(self, name, value, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
- self.value = value
-
- def register(self, node):
- Named.register(self, node)
- node.constants.append(self)
- Node.register(self)
-
-class Type(Named, Node):
-
- def __init__(self, name, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
-
- def is_present(self, value):
- return value != None
-
- def register(self, node):
- Named.register(self, node)
- Node.register(self)
-
-class Primitive(Coded, Type):
-
- def __init__(self, name, code, fixed, variable, children):
- Coded.__init__(self, code)
- Type.__init__(self, name, children)
- self.fixed = fixed
- self.variable = variable
-
- def register(self, node):
- Type.register(self, node)
- if self.code is not None:
- self.spec.types[self.code] = self
-
- def is_present(self, value):
- if self.fixed == 0:
- return value
- else:
- return Type.is_present(self, value)
-
- def encode(self, codec, value):
- getattr(codec, "write_%s" % self.name)(value)
-
- def decode(self, codec):
- return getattr(codec, "read_%s" % self.name)()
-
-class Domain(Type, Lookup):
-
- def __init__(self, name, type, children):
- Type.__init__(self, name, children)
- self.type = type
- self.choices = {}
-
- def resolve(self):
- self.type = self.lookup(self.type)
- Node.resolve(self)
-
- def encode(self, codec, value):
- self.type.encode(codec, value)
-
- def decode(self, codec):
- return self.type.decode(codec)
-
-class Enum:
-
- def __init__(self, name):
- self.name = name
- self._names = ()
- self._values = ()
-
- def values(self):
- return self._values
-
- def __repr__(self):
- return "%s(%s)" % (self.name, ", ".join(self._names))
-
-class Choice(Named, Node):
-
- def __init__(self, name, value, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
- self.value = value
-
- def register(self, node):
- Named.register(self, node)
- node.choices[self.value] = self
- Node.register(self)
- try:
- enum = node.spec.enums[node.name]
- except KeyError:
- enum = Enum(node.name)
- node.spec.enums[node.name] = enum
- setattr(enum, self.name, self.value)
- enum._names += (self.name,)
- enum._values += (self.value,)
-
-class Composite(Type, Coded):
-
- def __init__(self, name, label, code, size, pack, children):
- Coded.__init__(self, code)
- Type.__init__(self, name, children)
- self.label = label
- self.fields = []
- self.size = size
- self.pack = pack
-
- def new(self, args, kwargs):
- return datatypes.Struct(self, *args, **kwargs)
-
- def decode(self, codec):
- codec.read_size(self.size)
- if self.code is not None:
- code = codec.read_uint16()
- assert self.code == code
- return datatypes.Struct(self, **self.decode_fields(codec))
-
- def decode_fields(self, codec):
- flags = 0
- for i in range(self.pack):
- flags |= (codec.read_uint8() << 8*i)
-
- result = {}
-
- for i in range(len(self.fields)):
- f = self.fields[i]
- if flags & (0x1 << i):
- result[f.name] = f.type.decode(codec)
- else:
- result[f.name] = None
- return result
-
- def encode(self, codec, value):
- sc = StringCodec(self.spec)
- if self.code is not None:
- sc.write_uint16(self.code)
- self.encode_fields(sc, value)
- codec.write_size(self.size, len(sc.encoded))
- codec.write(sc.encoded)
-
- def encode_fields(self, codec, values):
- flags = 0
- for i in range(len(self.fields)):
- f = self.fields[i]
- if f.type.is_present(values[f.name]):
- flags |= (0x1 << i)
- for i in range(self.pack):
- codec.write_uint8((flags >> 8*i) & 0xFF)
- for i in range(len(self.fields)):
- f = self.fields[i]
- if flags & (0x1 << i):
- f.type.encode(codec, values[f.name])
-
- def docstring(self):
- docs = []
- if self.label:
- docs.append(self.label)
- docs += [d.text for d in self.docs]
- s = "\n\n".join([fill(t, 2) for t in docs])
- for f in self.fields:
- fdocs = []
- if f.label:
- fdocs.append(f.label)
- else:
- fdocs.append("")
- fdocs += [d.text for d in f.docs]
- s += "\n\n" + "\n\n".join([fill(fdocs[0], 4, f.name)] +
- [fill(t, 4) for t in fdocs[1:]])
- return s
-
-
-class Field(Named, Node, Lookup):
-
- def __init__(self, name, label, type, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
- self.label = label
- self.type = type
- self.exceptions = []
-
- def default(self):
- return None
-
- def register(self, node):
- Named.register(self, node)
- node.fields.append(self)
- Node.register(self)
-
- def resolve(self):
- self.type = self.lookup(self.type)
- Node.resolve(self)
-
- def __str__(self):
- return "%s: %s" % (self.qname, self.type.qname)
-
-class Struct(Composite):
-
- def register(self, node):
- Composite.register(self, node)
- if self.code is not None:
- self.spec.structs[self.code] = self
- self.spec.structs_by_name[self.name] = self
- self.pyname = self.name
- self.pydoc = self.docstring()
-
- def __str__(self):
- fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname)
- for f in self.fields])
- return "%s {\n %s\n}" % (self.qname, fields)
-
-class Segment:
-
- def __init__(self):
- self.segment_type = None
-
- def register(self, node):
- self.spec = node.spec
- self.klass = node.klass
- node.segments.append(self)
- Node.register(self)
-
-class Instruction(Composite, Segment):
-
- def __init__(self, name, label, code, children):
- Composite.__init__(self, name, label, code, 0, 2, children)
- Segment.__init__(self)
- self.track = None
- self.handlers = []
-
- def __str__(self):
- return "%s(%s)" % (self.qname, ", ".join(["%s: %s" % (f.name, f.type.qname)
- for f in self.fields]))
-
- def register(self, node):
- Composite.register(self, node)
- self.pyname = self.qname.replace(".", "_")
- self.pydoc = self.docstring()
- self.spec.instructions[self.pyname] = self
-
-class Control(Instruction):
-
- def __init__(self, name, code, label, children):
- Instruction.__init__(self, name, code, label, children)
- self.response = None
-
- def register(self, node):
- Instruction.register(self, node)
- node.controls.append(self)
- self.spec.controls[self.code] = self
- self.segment_type = self.spec["segment_type.control"].value
- self.track = self.spec["track.control"].value
-
-class Command(Instruction):
-
- def __init__(self, name, label, code, children):
- Instruction.__init__(self, name, label, code, children)
- self.result = None
- self.exceptions = []
- self.segments = []
-
- def register(self, node):
- Instruction.register(self, node)
- node.commands.append(self)
- self.spec.commands[self.code] = self
- self.segment_type = self.spec["segment_type.command"].value
- self.track = self.spec["track.command"].value
-
-class Header(Segment, Node):
-
- def __init__(self, children):
- Segment.__init__(self)
- Node.__init__(self, children)
- self.entries = []
-
- def register(self, node):
- Segment.register(self, node)
- self.segment_type = self.spec["segment_type.header"].value
- Node.register(self)
-
-class Entry(Lookup):
-
- def __init__(self, type):
- self.type = type
-
- def register(self, node):
- self.spec = node.spec
- self.klass = node.klass
- node.entries.append(self)
-
- def resolve(self):
- self.type = self.lookup(self.type)
-
-class Body(Segment, Node):
-
- def __init__(self, children):
- Segment.__init__(self)
- Node.__init__(self, children)
-
- def register(self, node):
- Segment.register(self, node)
- self.segment_type = self.spec["segment_type.body"].value
- Node.register(self)
-
- def resolve(self): pass
-
-class Class(Named, Coded, Node):
-
- def __init__(self, name, code, children):
- Named.__init__(self, name)
- Coded.__init__(self, code)
- Node.__init__(self, children)
- self.controls = []
- self.commands = []
-
- def register(self, node):
- Named.register(self, node)
- self.klass = self
- node.classes.append(self)
- Node.register(self)
-
-class Doc:
-
- def __init__(self, type, title, text):
- self.type = type
- self.title = title
- self.text = text
-
- def register(self, node):
- node.docs.append(self)
-
- def resolve(self): pass
-
-class Role(Named, Node):
-
- def __init__(self, name, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
-
- def register(self, node):
- Named.register(self, node)
- Node.register(self)
-
-class Rule(Named, Node):
-
- def __init__(self, name, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
-
- def register(self, node):
- Named.register(self, node)
- node.rules.append(self)
- Node.register(self)
-
-class Exception(Named, Node):
-
- def __init__(self, name, error_code, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
- self.error_code = error_code
-
- def register(self, node):
- Named.register(self, node)
- node.exceptions.append(self)
- Node.register(self)
-
-def direct(t):
- return lambda x: t
-
-def map_str(s):
- for c in s:
- if ord(c) >= 0x80:
- return "vbin16"
- return "str16"
-
-class Spec(Node):
-
- ENCODINGS = {
- unicode: direct("str16"),
- str: map_str,
- buffer: direct("vbin32"),
- int: direct("int64"),
- long: direct("int64"),
- float: direct("double"),
- None.__class__: direct("void"),
- list: direct("list"),
- tuple: direct("list"),
- dict: direct("map"),
- datatypes.timestamp: direct("datetime"),
- datetime.datetime: direct("datetime"),
- datatypes.UUID: direct("uuid")
- }
-
- def __init__(self, major, minor, port, children):
- Node.__init__(self, children)
- self.major = major
- self.minor = minor
- self.port = port
- self.constants = []
- self.classes = []
- self.types = {}
- self.qname = None
- self.spec = self
- self.klass = None
- self.instructions = {}
- self.controls = {}
- self.commands = {}
- self.structs = {}
- self.structs_by_name = {}
- self.enums = {}
-
- def encoding(self, obj):
- return self._encoding(obj.__class__, obj)
-
- def _encoding(self, klass, obj):
- if Spec.ENCODINGS.has_key(klass):
- return self.named[Spec.ENCODINGS[klass](obj)]
- for base in klass.__bases__:
- result = self._encoding(base, obj)
- if result != None:
- return result
-
-class Implement:
-
- def __init__(self, handle):
- self.handle = handle
-
- def register(self, node):
- node.handlers.append(self.handle)
-
- def resolve(self): pass
-
-class Response(Node):
-
- def __init__(self, name, children):
- Node.__init__(self, children)
- self.name = name
-
- def register(self, node):
- Node.register(self)
-
-class Result(Node, Lookup):
-
- def __init__(self, type, children):
- self.type = type
- Node.__init__(self, children)
-
- def register(self, node):
- node.result = self
- self.qname = node.qname
- self.klass = node.klass
- self.spec = node.spec
- Node.register(self)
-
- def resolve(self):
- self.type = self.lookup(self.type)
- Node.resolve(self)
-
-import mllib
-
-def num(s):
- if s: return int(s, 0)
-
-REPLACE = {" ": "_", "-": "_"}
-KEYWORDS = {"global": "global_",
- "return": "return_"}
-
-def id(name):
- name = str(name)
- for key, val in REPLACE.items():
- name = name.replace(key, val)
- try:
- name = KEYWORDS[name]
- except KeyError:
- pass
- return name
-
-class Loader:
-
- def __init__(self):
- self.class_code = 0
-
- def code(self, nd):
- c = num(nd["@code"])
- if c is None:
- return None
- else:
- return c | (self.class_code << 8)
-
- def list(self, q):
- result = []
- for nd in q:
- result.append(nd.dispatch(self))
- return result
-
- def children(self, n):
- return self.list(n.query["#tag"])
-
- def data(self, d):
- return d.data
-
- def do_amqp(self, a):
- return Spec(num(a["@major"]), num(a["@minor"]), num(a["@port"]),
- self.children(a))
-
- def do_type(self, t):
- return Primitive(id(t["@name"]), self.code(t), num(t["@fixed-width"]),
- num(t["@variable-width"]), self.children(t))
-
- def do_constant(self, c):
- return Constant(id(c["@name"]), num(c["@value"]), self.children(c))
-
- def do_domain(self, d):
- return Domain(id(d["@name"]), id(d["@type"]), self.children(d))
-
- def do_enum(self, e):
- return Anonymous(self.children(e))
-
- def do_choice(self, c):
- return Choice(id(c["@name"]), num(c["@value"]), self.children(c))
-
- def do_class(self, c):
- code = num(c["@code"])
- self.class_code = code
- children = self.children(c)
- children += self.list(c.query["command/result/struct"])
- self.class_code = 0
- return Class(id(c["@name"]), code, children)
-
- def do_doc(self, doc):
- text = reduce(lambda x, y: x + y, self.list(doc.children))
- return Doc(doc["@type"], doc["@title"], text)
-
- def do_xref(self, x):
- return x["@ref"]
-
- def do_role(self, r):
- return Role(id(r["@name"]), self.children(r))
-
- def do_control(self, c):
- return Control(id(c["@name"]), c["@label"], self.code(c), self.children(c))
-
- def do_rule(self, r):
- return Rule(id(r["@name"]), self.children(r))
-
- def do_implement(self, i):
- return Implement(id(i["@handle"]))
-
- def do_response(self, r):
- return Response(id(r["@name"]), self.children(r))
-
- def do_field(self, f):
- return Field(id(f["@name"]), f["@label"], id(f["@type"]), self.children(f))
-
- def do_struct(self, s):
- return Struct(id(s["@name"]), s["@label"], self.code(s), num(s["@size"]),
- num(s["@pack"]), self.children(s))
-
- def do_command(self, c):
- return Command(id(c["@name"]), c["@label"], self.code(c), self.children(c))
-
- def do_segments(self, s):
- return Anonymous(self.children(s))
-
- def do_header(self, h):
- return Header(self.children(h))
-
- def do_entry(self, e):
- return Entry(id(e["@type"]))
-
- def do_body(self, b):
- return Body(self.children(b))
-
- def do_result(self, r):
- type = r["@type"]
- if not type:
- type = r["struct/@name"]
- return Result(id(type), self.list(r.query["#tag", lambda x: x.name != "struct"]))
-
- def do_exception(self, e):
- return Exception(id(e["@name"]), id(e["@error-code"]), self.children(e))
-
-def load(xml):
- fname = xml + ".pcl"
-
- if os.path.exists(fname) and mtime(fname) > mtime(__file__):
- file = open(fname, "r")
- s = cPickle.load(file)
- file.close()
- else:
- doc = mllib.xml_parse(xml)
- s = doc["amqp"].dispatch(Loader())
- s.register()
- s.resolve()
-
- try:
- file = open(fname, "w")
- except IOError:
- file = None
-
- if file:
- cPickle.dump(s, file)
- file.close()
-
- return s
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 12b2781561..31c376b2f9 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -21,239 +21,13 @@
# Support library for qpid python tests.
#
-import sys, re, unittest, os, random, logging, traceback
-import qpid.client, qpid.spec, qmf.console
+import unittest, traceback, socket
+import qpid.client, qmf.console
import Queue
-from fnmatch import fnmatch
-from getopt import getopt, GetoptError
from qpid.content import Content
from qpid.message import Message
-
-#0-10 support
-from qpid.connection import Connection
-from qpid.spec010 import load
-from qpid.util import connect, ssl, URL
-
-def findmodules(root):
- """Find potential python modules under directory root"""
- found = []
- for dirpath, subdirs, files in os.walk(root):
- modpath = dirpath.replace(os.sep, '.')
- if not re.match(r'\.svn$', dirpath): # Avoid SVN directories
- for f in files:
- match = re.match(r'(.+)\.py$', f)
- if match and f != '__init__.py':
- found.append('.'.join([modpath, match.group(1)]))
- return found
-
-def default(value, default):
- if (value == None): return default
- else: return value
-
-class TestRunner:
-
- SPEC_FOLDER = "../specs"
- qpidd = os.getenv("QPIDD")
-
- """Runs unit tests.
-
- Parses command line arguments, provides utility functions for tests,
- runs the selected test suite.
- """
-
- def _die(self, message = None):
- if message: print message
- print """
-run-tests [options] [test*]
-The name of a test is package.module.ClassName.testMethod
-Options:
- -?/-h/--help : this message
- -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
- 0-8 - use the default 0-8 specification.
- 0-9 - use the default 0-9 specification.
- 0-10-errata - use the 0-10 specification with qpid errata.
- -e/--errata <errata.xml> : file containing amqp XML errata
- -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD
- env to point to broker executable. broker-args will be
- prepended with "--daemon --port=0"
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
- -S/--skip-self-test : skips the client self tests in the 'tests folder'
- -F/--spec-folder : folder that contains the specs to be loaded
- """
- sys.exit(1)
-
- def startBroker(self, brokerArgs):
- """Start a single broker daemon"""
- if TestRunner.qpidd == None:
- self._die("QPIDD environment var must point to qpidd when using -B/--start-broker")
- cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs)
- portStr = os.popen(cmd).read()
- if len(portStr) == 0:
- self._die("%s failed to start" % TestRunner.qpidd)
- port = int(portStr)
- pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read())
- print "Started broker: pid=%d, port=%d" % (pid, port)
- self.brokerTuple = (pid, port)
- self.setBroker("localhost:%d" % port)
-
- def stopBroker(self):
- """Stop the broker using qpidd -q"""
- if self.brokerTuple:
- ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q")
- if ret != 0:
- self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret))
- print "Stopped broker: pid=%d, port=%d" % self.brokerTuple
-
- def killBroker(self):
- """Kill the broker using kill -9 (SIGTERM)"""
- if self.brokerTuple:
- os.kill(self.brokerTuple[0], signal.SIGTERM)
- print "Killed broker: pid=%d, port=%d" % self.brokerTuple
-
- def setBroker(self, broker):
- try:
- self.url = URL(broker)
- except ValueError:
- self._die("'%s' is not a valid broker" % (broker))
- self.user = default(self.url.user, "guest")
- self.password = default(self.url.password, "guest")
- self.host = self.url.host
- if self.url.scheme == URL.AMQPS:
- self.ssl = True
- default_port = 5671
- else:
- self.ssl = False
- default_port = 5672
- self.port = default(self.url.port, default_port)
-
- def ignoreFile(self, filename):
- f = file(filename)
- for line in f.readlines(): self.ignore.append(line.strip())
- f.close()
-
- def use08spec(self):
- "True if we are running with the old 0-8 spec."
- # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons.
- return self.spec.major==8 and self.spec.minor==0
-
- def use09spec(self):
- "True if we are running with the 0-9 (non-wip) spec."
- return self.spec.major==0 and self.spec.minor==9
-
- def _parseargs(self, args):
- # Defaults
- self.setBroker("localhost")
- self.verbose = 1
- self.ignore = []
- self.specfile = "0-8"
- self.errata = []
- self.skip_self_test = False
-
- try:
- opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:",
- ["help", "spec", "errata=", "broker=",
- "start-broker=", "verbose", "skip-self-test", "ignore",
- "ignore-file", "spec-folder"])
- except GetoptError, e:
- self._die(str(e))
- # check for mutually exclusive options
- if "-B" in opts or "--start-broker" in opts:
- if "-b" in opts or "--broker" in opts:
- self._die("Cannot use -B/--start-broker and -b/broker options together")
- for opt, value in opts:
- if opt in ("-?", "-h", "--help"): self._die()
- if opt in ("-s", "--spec"): self.specfile = value
- if opt in ("-e", "--errata"): self.errata.append(value)
- if opt in ("-b", "--broker"): self.setBroker(value)
- if opt in ("-B", "--start-broker"): self.startBroker(value)
- if opt in ("-v", "--verbose"): self.verbose = 2
- if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
- if opt in ("-i", "--ignore"): self.ignore.append(value)
- if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
- if opt in ("-S", "--skip-self-test"): self.skip_self_test = True
- if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value
-
- # Abbreviations for default settings.
- if (self.specfile == "0-10"):
- self.spec = load(self.get_spec_file("amqp.0-10.xml"))
- elif (self.specfile == "0-10-errata"):
- self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml"))
- else:
- if (self.specfile == "0-8"):
- self.specfile = self.get_spec_file("amqp.0-8.xml")
- elif (self.specfile == "0-9"):
- self.specfile = self.get_spec_file("amqp.0-9.xml")
- self.errata.append(self.get_spec_file("amqp-errata.0-9.xml"))
-
- if (self.specfile == None):
- self._die("No XML specification provided")
- print "Using specification from:", self.specfile
-
- self.spec = qpid.spec.load(self.specfile, *self.errata)
-
- if len(self.tests) == 0:
- if not self.skip_self_test:
- self.tests=findmodules("tests")
- if self.use08spec() or self.use09spec():
- self.tests+=findmodules("tests_0-8")
- elif (self.spec.major == 99 and self.spec.minor == 0):
- self.tests+=findmodules("tests_0-10_preview")
- elif (self.spec.major == 0 and self.spec.minor == 10):
- self.tests+=findmodules("tests_0-10")
-
- def testSuite(self):
- class IgnoringTestSuite(unittest.TestSuite):
- def addTest(self, test):
- if isinstance(test, unittest.TestCase):
- for pattern in testrunner.ignore:
- if fnmatch(test.id(), pattern):
- return
- unittest.TestSuite.addTest(self, test)
-
- # Use our IgnoringTestSuite in the test loader.
- unittest.TestLoader.suiteClass = IgnoringTestSuite
- return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
-
- def run(self, args=sys.argv[1:]):
- self.brokerTuple = None
- self._parseargs(args)
- runner = unittest.TextTestRunner(descriptions=False,
- verbosity=self.verbose)
- result = runner.run(self.testSuite())
-
- if (self.ignore):
- print "======================================="
- print "NOTE: the following tests were ignored:"
- for t in self.ignore: print t
- print "======================================="
-
- self.stopBroker()
- return result.wasSuccessful()
-
- def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None):
- """Connect to the broker, returns a qpid.client.Client"""
- host = host or self.host
- port = port or self.port
- spec = spec or self.spec
- user = user or self.user
- password = password or self.password
- client = qpid.client.Client(host, port, spec)
- if self.use08spec():
- client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
- else:
- client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
- return client
-
- def get_spec_file(self, fname):
- return TestRunner.SPEC_FOLDER + os.sep + fname
-
-# Global instance for tests to call connect.
-testrunner = TestRunner()
-
+from qpid.harness import Skipped
+from qpid.exceptions import VersionError
class TestBase(unittest.TestCase):
"""Base class for Qpid test cases.
@@ -267,6 +41,9 @@ class TestBase(unittest.TestCase):
resources to clean up later.
"""
+ def configure(self, config):
+ self.config = config
+
def setUp(self):
self.queues = []
self.exchanges = []
@@ -293,9 +70,26 @@ class TestBase(unittest.TestCase):
else:
self.client.close()
- def connect(self, *args, **keys):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None):
"""Create a new connction, return the Client object"""
- return testrunner.connect(*args, **keys)
+ host = host or self.config.broker.host
+ port = port or self.config.broker.port or 5672
+ user = user or "guest"
+ password = password or "guest"
+ client = qpid.client.Client(host, port)
+ try:
+ if client.spec.major == 8 and client.spec.minor == 0:
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
+ else:
+ client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
+ except qpid.client.Closed, e:
+ if isinstance(e.args[0], VersionError):
+ raise Skipped(e.args[0])
+ else:
+ raise e
+ except socket.error, e:
+ raise Skipped(e)
+ return client
def queue_declare(self, channel=None, *args, **keys):
channel = channel or self.channel
@@ -319,17 +113,8 @@ class TestBase(unittest.TestCase):
def consume(self, queueName):
"""Consume from named queue returns the Queue object."""
- if testrunner.use08spec() or testrunner.use09spec():
- reply = self.channel.basic_consume(queue=queueName, no_ack=True)
- return self.client.queue(reply.consumer_tag)
- else:
- if not "uniqueTag" in dir(self): self.uniqueTag = 1
- else: self.uniqueTag += 1
- consumer_tag = "tag" + str(self.uniqueTag)
- self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
- self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
- self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
- return self.client.queue(consumer_tag)
+ reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+ return self.client.queue(reply.consumer_tag)
def subscribe(self, channel=None, **keys):
channel = channel or self.channel
@@ -350,24 +135,14 @@ class TestBase(unittest.TestCase):
Publish to exchange and assert queue.get() returns the same message.
"""
body = self.uniqueString()
- if testrunner.use08spec() or testrunner.use09spec():
- self.channel.basic_publish(
- exchange=exchange,
- content=Content(body, properties=properties),
- routing_key=routing_key)
- else:
- self.channel.message_transfer(
- destination=exchange,
- content=Content(body, properties={'application_headers':properties,'routing_key':routing_key}))
+ self.channel.basic_publish(
+ exchange=exchange,
+ content=Content(body, properties=properties),
+ routing_key=routing_key)
msg = queue.get(timeout=1)
- if testrunner.use08spec() or testrunner.use09spec():
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content.properties)
- else:
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content['application_headers'])
+ self.assertEqual(body, msg.content.body)
+ if (properties):
+ self.assertEqual(properties, msg.content.properties)
def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
@@ -394,11 +169,19 @@ class TestBase(unittest.TestCase):
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
+#0-10 support
+from qpid.connection import Connection
+from qpid.util import connect, ssl, URL
+
class TestBase010(unittest.TestCase):
"""
Base class for Qpid test cases. using the final 0-10 spec
"""
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+
def setUp(self):
self.conn = self.connect()
self.session = self.conn.session("test-session", timeout=10)
@@ -406,15 +189,26 @@ class TestBase010(unittest.TestCase):
def startQmf(self, handler=None):
self.qmf = qmf.console.Session(handler)
- self.qmf_broker = self.qmf.addBroker(str(testrunner.url))
+ self.qmf_broker = self.qmf.addBroker(str(self.broker))
def connect(self, host=None, port=None):
- sock = connect(host or testrunner.host, port or testrunner.port)
- if testrunner.url.scheme == URL.AMQPS:
+ url = self.broker
+ if url.scheme == URL.AMQPS:
+ default_port = 5671
+ else:
+ default_port = 5672
+ try:
+ sock = connect(host or url.host, port or url.port or default_port)
+ except socket.error, e:
+ raise Skipped(e)
+ if url.scheme == URL.AMQPS:
sock = ssl(sock)
- conn = Connection(sock, username=testrunner.user,
- password=testrunner.password)
- conn.start(timeout=10)
+ conn = Connection(sock, username=url.user or "guest",
+ password=url.password or "guest")
+ try:
+ conn.start(timeout=10)
+ except VersionError, e:
+ raise Skipped(e)
return conn
def tearDown(self):
diff --git a/python/qpid/tests/framing.py b/python/qpid/tests/framing.py
index 4cd596b583..0b33df8b9a 100644
--- a/python/qpid/tests/framing.py
+++ b/python/qpid/tests/framing.py
@@ -40,6 +40,50 @@ class Base(Test):
assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2)
assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2)
+ def cmp_list(self, l1, l2):
+ if l1 is None:
+ assert l2 is None
+ return
+
+ assert len(l1) == len(l2)
+ for v1, v2 in zip(l1, l2):
+ if isinstance(v1, Compound):
+ self.cmp_ops(v1, v2)
+ else:
+ assert v1 == v2
+
+ def cmp_ops(self, op1, op2):
+ if op1 is None:
+ assert op2 is None
+ return
+
+ assert op1.__class__ == op2.__class__
+ cls = op1.__class__
+ assert op1.NAME == op2.NAME
+ assert op1.CODE == op2.CODE
+ assert op1.FIELDS == op2.FIELDS
+ for f in cls.FIELDS:
+ v1 = getattr(op1, f.name)
+ v2 = getattr(op2, f.name)
+ if COMPOUND.has_key(f.type) or f.type == "struct32":
+ self.cmp_ops(v1, v2)
+ elif f.type in ("list", "array"):
+ self.cmp_list(v1, v2)
+ else:
+ assert v1 == v2, "expected: %r, got %r" % (v1, v2)
+
+ if issubclass(cls, Command) or issubclass(cls, Control):
+ assert op1.channel == op2.channel
+
+ if issubclass(cls, Command):
+ assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync)
+ assert (op1.headers is None and op2.headers is None) or \
+ (op1.headers is not None and op2.headers is not None)
+ if op1.headers is not None:
+ assert len(op1.headers) == len(op2.headers)
+ for h1, h2 in zip(op1.headers, op2.headers):
+ self.cmp_ops(h1, h2)
+
class FrameTest(Base):
def enc_dec(self, frames, encoded=None):
@@ -171,3 +215,75 @@ class SegmentTest(Base):
for i in range(0, 8, 2)]
self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2)
+
+from qpid.ops import *
+
+class OpTest(Base):
+
+ def enc_dec(self, ops):
+ enc = OpEncoder()
+ dec = OpDecoder()
+ enc.write(*ops)
+ segs = enc.read()
+ dec.write(*segs)
+ dops = dec.read()
+ assert len(ops) == len(dops)
+ for op1, op2 in zip(ops, dops):
+ self.cmp_ops(op1, op2)
+
+ def testEmtpyMT(self):
+ self.enc_dec([MessageTransfer()])
+
+ def testEmptyMTSync(self):
+ self.enc_dec([MessageTransfer(sync=True)])
+
+ def testMT(self):
+ self.enc_dec([MessageTransfer(destination="asdf")])
+
+ def testSyncMT(self):
+ self.enc_dec([MessageTransfer(destination="asdf", sync=True)])
+
+ def testEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(payload="")])
+
+ def testPayloadMT(self):
+ self.enc_dec([MessageTransfer(payload="test payload")])
+
+ def testHeadersEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])])
+
+ def testHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")])
+
+ def testMultiHeadersEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])])
+
+ def testMultiHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")])
+
+ def testContentTypeHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")])
+
+ def testMulti(self):
+ self.enc_dec([MessageTransfer(),
+ MessageTransfer(sync=True),
+ MessageTransfer(destination="one"),
+ MessageTransfer(destination="two", sync=True),
+ MessageTransfer(destination="three", payload="test payload")])
+
+ def testControl(self):
+ self.enc_dec([SessionAttach(name="asdf")])
+
+ def testMixed(self):
+ self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")])
+
+ def testChannel(self):
+ self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)])
+
+ def testCompound(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])])
+
+ def testListCompound(self):
+ self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"),
+ Xid(global_id="two"),
+ Xid(global_id="three")]))])
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index 8a142d6c96..7706ebbabe 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -22,6 +22,7 @@
import time
from qpid.tests import Test
+from qpid.harness import Skipped
from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
@@ -42,7 +43,10 @@ class Base(Test):
def setup(self):
self.test_id = uuid4()
self.broker = self.config.broker
- self.conn = self.setup_connection()
+ try:
+ self.conn = self.setup_connection()
+ except ConnectError, e:
+ raise Skipped(e)
self.ssn = self.setup_session()
self.snd = self.setup_sender()
self.rcv = self.setup_receiver()
@@ -65,7 +69,7 @@ class Base(Test):
receiver = ssn.receiver("ping-queue")
msg = receiver.fetch(0)
ssn.acknowledge()
- assert msg.content == content
+ assert msg.content == content, "expected %r, got %r" % (content, msg.content)
def drain(self, rcv, limit=None):
contents = []