diff options
Diffstat (limited to 'python/qpid')
-rw-r--r-- | python/qpid/assembler.py | 118 | ||||
-rw-r--r-- | python/qpid/client.py | 7 | ||||
-rw-r--r-- | python/qpid/codec010.py | 227 | ||||
-rw-r--r-- | python/qpid/connection.py | 65 | ||||
-rw-r--r-- | python/qpid/connection08.py | 15 | ||||
-rw-r--r-- | python/qpid/datatypes.py | 6 | ||||
-rw-r--r-- | python/qpid/delegates.py | 33 | ||||
-rw-r--r-- | python/qpid/exceptions.py | 1 | ||||
-rw-r--r-- | python/qpid/framer.py | 63 | ||||
-rw-r--r-- | python/qpid/framing.py | 172 | ||||
-rw-r--r-- | python/qpid/generator.py | 48 | ||||
-rw-r--r-- | python/qpid/harness.py | 20 | ||||
-rw-r--r-- | python/qpid/messaging.py | 29 | ||||
-rw-r--r-- | python/qpid/ops.py | 277 | ||||
-rw-r--r-- | python/qpid/peer.py | 4 | ||||
-rw-r--r-- | python/qpid/session.py | 199 | ||||
-rw-r--r-- | python/qpid/spec.py | 4 | ||||
-rw-r--r-- | python/qpid/spec010.py | 708 | ||||
-rw-r--r-- | python/qpid/testlib.py | 326 | ||||
-rw-r--r-- | python/qpid/tests/framing.py | 116 | ||||
-rw-r--r-- | python/qpid/tests/messaging.py | 8 |
21 files changed, 997 insertions, 1449 deletions
diff --git a/python/qpid/assembler.py b/python/qpid/assembler.py deleted file mode 100644 index 92bb0aa0f8..0000000000 --- a/python/qpid/assembler.py +++ /dev/null @@ -1,118 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from codec010 import StringCodec -from framer import * -from logging import getLogger - -log = getLogger("qpid.io.seg") - -class Segment: - - def __init__(self, first, last, type, track, channel, payload): - self.id = None - self.offset = None - self.first = first - self.last = last - self.type = type - self.track = track - self.channel = channel - self.payload = payload - - def decode(self, spec): - segs = spec["segment_type"] - choice = segs.choices[self.type] - return getattr(self, "decode_%s" % choice.name)(spec) - - def decode_control(self, spec): - sc = StringCodec(spec, self.payload) - return sc.read_control() - - def decode_command(self, spec): - sc = StringCodec(spec, self.payload) - hdr, cmd = sc.read_command() - cmd.id = self.id - return hdr, cmd - - def decode_header(self, spec): - sc = StringCodec(spec, self.payload) - values = [] - while len(sc.encoded) > 0: - values.append(sc.read_struct32()) - return values - - def decode_body(self, spec): - return self.payload - - def __str__(self): - return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type, - self.track, self.channel, self.payload) - - def __repr__(self): - return str(self) - -class Assembler(Framer): - - def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD): - Framer.__init__(self, sock) - self.max_payload = max_payload - self.fragments = {} - - def read_segment(self): - while True: - frame = self.read_frame() - - key = (frame.channel, frame.track) - seg = self.fragments.get(key) - if seg == None: - seg = Segment(frame.isFirstSegment(), frame.isLastSegment(), - frame.type, frame.track, frame.channel, "") - self.fragments[key] = seg - - seg.payload += frame.payload - - if frame.isLastFrame(): - self.fragments.pop(key) - log.debug("RECV %s", seg) - return seg - - def write_segment(self, segment): - remaining = segment.payload - - first = True - while first or remaining: - payload = remaining[:self.max_payload] - remaining = remaining[self.max_payload:] - - flags = 0 - if first: - flags |= FIRST_FRM - first = False - if not remaining: - flags |= LAST_FRM - if segment.first: - flags |= FIRST_SEG - if segment.last: - flags |= LAST_SEG - - frame = Frame(flags, segment.type, segment.track, segment.channel, - payload) - self.write_frame(frame) - - log.debug("SENT %s", segment) diff --git a/python/qpid/client.py b/python/qpid/client.py index 4605710de8..6107a4bc35 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -39,11 +39,8 @@ class Client: if spec: self.spec = spec else: - try: - name = os.environ["AMQP_SPEC"] - except KeyError: - raise EnvironmentError("environment variable AMQP_SPEC must be set") - self.spec = load(name) + from qpid_config import amqp_spec_0_9 + self.spec = load(amqp_spec_0_9) self.structs = StructFactory(self.spec) self.sessions = {} diff --git a/python/qpid/codec010.py b/python/qpid/codec010.py index f07362c38d..682743df19 100644 --- a/python/qpid/codec010.py +++ b/python/qpid/codec010.py @@ -20,23 +20,66 @@ import datetime from packer import Packer from datatypes import serial, timestamp, RangedSet, Struct, UUID +from ops import Compound, PRIMITIVE, COMPOUND class CodecException(Exception): pass +def direct(t): + return lambda x: t + +def map_str(s): + for c in s: + if ord(c) >= 0x80: + return "vbin16" + return "str16" + class Codec(Packer): - def __init__(self, spec): - self.spec = spec + ENCODINGS = { + unicode: direct("str16"), + str: map_str, + buffer: direct("vbin32"), + int: direct("int64"), + long: direct("int64"), + float: direct("double"), + None.__class__: direct("void"), + list: direct("list"), + tuple: direct("list"), + dict: direct("map"), + timestamp: direct("datetime"), + datetime.datetime: direct("datetime"), + UUID: direct("uuid"), + Compound: direct("struct32") + } + + def encoding(self, obj): + enc = self._encoding(obj.__class__, obj) + if enc is None: + raise CodecException("no encoding for %r" % obj) + return PRIMITIVE[enc] + + def _encoding(self, klass, obj): + if self.ENCODINGS.has_key(klass): + return self.ENCODINGS[klass](obj) + for base in klass.__bases__: + result = self._encoding(base, obj) + if result != None: + return result + + def read_primitive(self, type): + return getattr(self, "read_%s" % type.NAME)() + def write_primitive(self, type, v): + getattr(self, "write_%s" % type.NAME)(v) - def write_void(self, v): - assert v == None def read_void(self): return None + def write_void(self, v): + assert v == None - def write_bit(self, b): - if not b: raise ValueError(b) def read_bit(self): return True + def write_bit(self, b): + if not b: raise ValueError(b) def read_uint8(self): return self.unpack("!B") @@ -172,20 +215,8 @@ class Codec(Packer): self.write_uint32(len(b)) self.write(b) - def write_map(self, m): - sc = StringCodec(self.spec) - if m is not None: - sc.write_uint32(len(m)) - for k, v in m.items(): - type = self.spec.encoding(v) - if type == None: - raise CodecException("no encoding for %s" % v.__class__) - sc.write_str8(k) - sc.write_uint8(type.code) - type.encode(sc, v) - self.write_vbin32(sc.encoded) def read_map(self): - sc = StringCodec(self.spec, self.read_vbin32()) + sc = StringCodec(self.read_vbin32()) if not sc.encoded: return None count = sc.read_uint32() @@ -193,91 +224,132 @@ class Codec(Packer): while sc.encoded: k = sc.read_str8() code = sc.read_uint8() - type = self.spec.types[code] - v = type.decode(sc) + type = PRIMITIVE[code] + v = sc.read_primitive(type) result[k] = v return result + def write_map(self, m): + sc = StringCodec() + if m is not None: + sc.write_uint32(len(m)) + for k, v in m.items(): + type = self.encoding(v) + sc.write_str8(k) + sc.write_uint8(type.CODE) + sc.write_primitive(type, v) + self.write_vbin32(sc.encoded) + def read_array(self): + sc = StringCodec(self.read_vbin32()) + if not sc.encoded: + return None + type = PRIMITIVE[sc.read_uint8()] + count = sc.read_uint32() + result = [] + while count > 0: + result.append(sc.read_primitive(type)) + count -= 1 + return result def write_array(self, a): - sc = StringCodec(self.spec) + sc = StringCodec() if a is not None: if len(a) > 0: - type = self.spec.encoding(a[0]) + type = self.encoding(a[0]) else: - type = self.spec.encoding(None) - sc.write_uint8(type.code) + type = self.encoding(None) + sc.write_uint8(type.CODE) sc.write_uint32(len(a)) for o in a: - type.encode(sc, o) + sc.write_primitive(type, o) self.write_vbin32(sc.encoded) - def read_array(self): - sc = StringCodec(self.spec, self.read_vbin32()) + + def read_list(self): + sc = StringCodec(self.read_vbin32()) if not sc.encoded: return None - type = self.spec.types[sc.read_uint8()] count = sc.read_uint32() result = [] while count > 0: - result.append(type.decode(sc)) + type = PRIMITIVE[sc.read_uint8()] + result.append(sc.read_primitive(type)) count -= 1 return result - def write_list(self, l): - sc = StringCodec(self.spec) + sc = StringCodec() if l is not None: sc.write_uint32(len(l)) for o in l: - type = self.spec.encoding(o) - sc.write_uint8(type.code) - type.encode(sc, o) + type = self.encoding(o) + sc.write_uint8(type.CODE) + sc.write_primitive(type, o) self.write_vbin32(sc.encoded) - def read_list(self): - sc = StringCodec(self.spec, self.read_vbin32()) - if not sc.encoded: - return None - count = sc.read_uint32() - result = [] - while count > 0: - type = self.spec.types[sc.read_uint8()] - result.append(type.decode(sc)) - count -= 1 - return result def read_struct32(self): size = self.read_uint32() code = self.read_uint16() - type = self.spec.structs[code] - fields = type.decode_fields(self) - return Struct(type, **fields) + cls = COMPOUND[code] + op = cls() + self.read_fields(op) + return op def write_struct32(self, value): - sc = StringCodec(self.spec) - sc.write_uint16(value._type.code) - value._type.encode_fields(sc, value) - self.write_vbin32(sc.encoded) - - def read_control(self): - cntrl = self.spec.controls[self.read_uint16()] - return Struct(cntrl, **cntrl.decode_fields(self)) - def write_control(self, ctrl): - type = ctrl._type - self.write_uint16(type.code) - type.encode_fields(self, ctrl) - - def read_command(self): - type = self.spec.commands[self.read_uint16()] - hdr = self.spec["session.header"].decode(self) - cmd = Struct(type, **type.decode_fields(self)) - return hdr, cmd - def write_command(self, hdr, cmd): - self.write_uint16(cmd._type.code) - hdr._type.encode(self, hdr) - cmd._type.encode_fields(self, cmd) + self.write_compound(value) + + def read_compound(self, cls): + size = self.read_size(cls.SIZE) + if cls.CODE is not None: + code = self.read_uint16() + assert code == cls.CODE + op = cls() + self.read_fields(op) + return op + def write_compound(self, op): + sc = StringCodec() + if op.CODE is not None: + sc.write_uint16(op.CODE) + sc.write_fields(op) + self.write_size(op.SIZE, len(sc.encoded)) + self.write(sc.encoded) + + def read_fields(self, op): + flags = 0 + for i in range(op.PACK): + flags |= (self.read_uint8() << 8*i) + + for i in range(len(op.FIELDS)): + f = op.FIELDS[i] + if flags & (0x1 << i): + if COMPOUND.has_key(f.type): + value = self.read_compound(COMPOUND[f.type]) + else: + value = getattr(self, "read_%s" % f.type)() + setattr(op, f.name, value) + def write_fields(self, op): + flags = 0 + for i in range(len(op.FIELDS)): + f = op.FIELDS[i] + value = getattr(op, f.name) + if f.type == "bit": + present = value + else: + present = value != None + if present: + flags |= (0x1 << i) + for i in range(op.PACK): + self.write_uint8((flags >> 8*i) & 0xFF) + for i in range(len(op.FIELDS)): + f = op.FIELDS[i] + if flags & (0x1 << i): + if COMPOUND.has_key(f.type): + enc = self.write_compound + else: + enc = getattr(self, "write_%s" % f.type) + value = getattr(op, f.name) + enc(value) def read_size(self, width): if width > 0: attr = "read_uint%d" % (width*8) return getattr(self, attr)() - def write_size(self, width, n): if width > 0: attr = "write_uint%d" % (width*8) @@ -285,7 +357,6 @@ class Codec(Packer): def read_uuid(self): return UUID(self.unpack("16s")) - def write_uuid(self, s): if isinstance(s, UUID): s = s.bytes @@ -293,7 +364,6 @@ class Codec(Packer): def read_bin128(self): return self.unpack("16s") - def write_bin128(self, b): self.pack("16s", b) @@ -301,14 +371,13 @@ class Codec(Packer): class StringCodec(Codec): - def __init__(self, spec, encoded = ""): - Codec.__init__(self, spec) + def __init__(self, encoded = ""): self.encoded = encoded - def write(self, s): - self.encoded += s - def read(self, n): result = self.encoded[:n] self.encoded = self.encoded[n:] return result + + def write(self, s): + self.encoded += s diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 5abab3802c..680f8f62e3 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -20,14 +20,14 @@ import datatypes, session from threading import Thread, Condition, RLock from util import wait, notify -from assembler import Assembler, Segment from codec010 import StringCodec +from framing import * from session import Session from generator import control_invoker from spec import SPEC from exceptions import * from logging import getLogger -import delegates +import delegates, socket class ChannelBusy(Exception): pass @@ -43,12 +43,12 @@ def client(*args, **kwargs): def server(*args, **kwargs): return delegates.Server(*args, **kwargs) -class Connection(Assembler): +from framer import Framer - def __init__(self, sock, spec=SPEC, delegate=client, **args): - Assembler.__init__(self, sock) - self.spec = spec +class Connection(Framer): + def __init__(self, sock, delegate=client, **args): + Framer.__init__(self, sock) self.lock = RLock() self.attached = {} self.sessions = {} @@ -66,6 +66,10 @@ class Connection(Assembler): self.channel_max = 65535 + self.op_enc = OpEncoder() + self.seg_enc = SegmentEncoder() + self.frame_enc = FrameEncoder() + self.delegate = delegate(self, **args) def attach(self, name, ch, delegate, force=False): @@ -145,15 +149,44 @@ class Connection(Assembler): raise ConnectionFailed(*self.close_code) def run(self): + frame_dec = FrameDecoder() + seg_dec = SegmentDecoder() + op_dec = OpDecoder() + while not self.closed: try: - seg = self.read_segment() - except Closed: + data = self.sock.recv(64*1024) + if not data: + self.detach_all() + break + except socket.timeout: + if self.aborted(): + self.detach_all() + raise Closed("connection timed out") + else: + continue + except socket.error, e: self.detach_all() - break - self.delegate.received(seg) + raise Closed(e) + frame_dec.write(data) + seg_dec.write(*frame_dec.read()) + op_dec.write(*seg_dec.read()) + for op in op_dec.read(): + self.delegate.received(op) self.sock.close() + def write_op(self, op): + self.sock_lock.acquire() + try: + self.op_enc.write(op) + self.seg_enc.write(*self.op_enc.read()) + self.frame_enc.write(*self.seg_enc.read()) + bytes = self.frame_enc.read() + self.write(bytes) + self.flush() + finally: + self.sock_lock.release() + def close(self, timeout=None): if not self.opened: return Channel(self, 0).connection_close(200) @@ -169,19 +202,17 @@ class Connection(Assembler): log = getLogger("qpid.io.ctl") -class Channel(control_invoker(SPEC)): +class Channel(control_invoker()): def __init__(self, connection, id): self.connection = connection self.id = id self.session = None - def invoke(self, type, args, kwargs): - ctl = type.new(args, kwargs) - sc = StringCodec(self.spec) - sc.write_control(ctl) - self.connection.write_segment(Segment(True, True, type.segment_type, - type.track, self.id, sc.encoded)) + def invoke(self, op, args, kwargs): + ctl = op(*args, **kwargs) + ctl.channel = self.id + self.connection.write_op(ctl) log.debug("SENT %s", ctl) def __str__(self): diff --git a/python/qpid/connection08.py b/python/qpid/connection08.py index be94a792cb..d34cfe2847 100644 --- a/python/qpid/connection08.py +++ b/python/qpid/connection08.py @@ -28,6 +28,7 @@ from cStringIO import StringIO from spec import load from codec import EOF from compat import SHUT_RDWR +from exceptions import VersionError class SockIO: @@ -73,6 +74,9 @@ def listen(host, port, predicate = lambda: True): s, a = sock.accept() yield SockIO(s) +class FramingError(Exception): + pass + class Connection: def __init__(self, io, spec): @@ -107,7 +111,16 @@ class Connection: def read_8_0(self): c = self.codec - type = self.spec.constants.byid[c.decode_octet()].name + tid = c.decode_octet() + try: + type = self.spec.constants.byid[tid].name + except KeyError: + if tid == ord('A') and c.unpack("!3s") == "MQP": + _, _, major, minor = c.unpack("4B") + raise VersionError("client: %s-%s, server: %s-%s" % + (self.spec.major, self.spec.minor, major, minor)) + else: + raise FramingError("unknown frame type: %s" % tid) channel = c.decode_short() body = c.decode_longstr() dec = codec.Codec(StringIO(body), self.spec) diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py index 4cd3fade2c..bba3f5b9ab 100644 --- a/python/qpid/datatypes.py +++ b/python/qpid/datatypes.py @@ -84,7 +84,7 @@ class Message: def get(self, name): if self.headers: for h in self.headers: - if h._type.name == name: + if h.NAME == name: return h return None @@ -93,7 +93,7 @@ class Message: self.headers = [] idx = 0 while idx < len(self.headers): - if self.headers[idx]._type == header._type: + if self.headers[idx].NAME == header.NAME: self.headers[idx] = header return idx += 1 @@ -102,7 +102,7 @@ class Message: def clear(self, name): idx = 0 while idx < len(self.headers): - if self.headers[idx]._type.name == name: + if self.headers[idx].NAME == name: del self.headers[idx] return idx += 1 diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py index 82bbe67ede..c74cc5a945 100644 --- a/python/qpid/delegates.py +++ b/python/qpid/delegates.py @@ -20,7 +20,9 @@ import os, connection, session from util import notify from datatypes import RangedSet +from exceptions import VersionError from logging import getLogger +from ops import Control import sys log = getLogger("qpid.io.ctl") @@ -29,26 +31,22 @@ class Delegate: def __init__(self, connection, delegate=session.client): self.connection = connection - self.spec = connection.spec self.delegate = delegate - self.control = self.spec["track.control"].value - def received(self, seg): - ssn = self.connection.attached.get(seg.channel) + def received(self, op): + ssn = self.connection.attached.get(op.channel) if ssn is None: - ch = connection.Channel(self.connection, seg.channel) + ch = connection.Channel(self.connection, op.channel) else: ch = ssn.channel - if seg.track == self.control: - ctl = seg.decode(self.spec) - log.debug("RECV %s", ctl) - attr = ctl._type.qname.replace(".", "_") - getattr(self, attr)(ch, ctl) + if isinstance(op, Control): + log.debug("RECV %s", op) + getattr(self, op.NAME)(ch, op) elif ssn is None: ch.session_detached() else: - ssn.received(seg) + ssn.received(op) def connection_close(self, ch, close): self.connection.close_code = (close.reply_code, close.reply_text) @@ -124,7 +122,8 @@ class Server(Delegate): def start(self): self.connection.read_header() - self.connection.write_header(self.spec.major, self.spec.minor) + # XXX + self.connection.write_header(0, 10) connection.Channel(self.connection, 0).connection_start(mechanisms=["ANONYMOUS"]) def connection_start_ok(self, ch, start_ok): @@ -156,8 +155,14 @@ class Client(Delegate): self.heartbeat = heartbeat def start(self): - self.connection.write_header(self.spec.major, self.spec.minor) - self.connection.read_header() + # XXX + cli_major = 0 + cli_minor = 10 + self.connection.write_header(cli_major, cli_minor) + magic, _, _, major, minor = self.connection.read_header() + if not (magic == "AMQP" and major == cli_major and minor == cli_minor): + raise VersionError("client: %s-%s, server: %s-%s" % + (cli_major, cli_minor, major, minor)) def connection_start(self, ch, start): r = "\0%s\0%s" % (self.username, self.password) diff --git a/python/qpid/exceptions.py b/python/qpid/exceptions.py index 7eaaf81ed4..2bd80b7ffe 100644 --- a/python/qpid/exceptions.py +++ b/python/qpid/exceptions.py @@ -19,3 +19,4 @@ class Closed(Exception): pass class Timeout(Exception): pass +class VersionError(Exception): pass diff --git a/python/qpid/framer.py b/python/qpid/framer.py index 0d82e4378b..4cd0ae6f26 100644 --- a/python/qpid/framer.py +++ b/python/qpid/framer.py @@ -26,48 +26,6 @@ from logging import getLogger raw = getLogger("qpid.io.raw") frm = getLogger("qpid.io.frm") -FIRST_SEG = 0x08 -LAST_SEG = 0x04 -FIRST_FRM = 0x02 -LAST_FRM = 0x01 - -class Frame: - - HEADER = "!2BHxBH4x" - HEADER_SIZE = struct.calcsize(HEADER) - MAX_PAYLOAD = 65535 - struct.calcsize(HEADER) - - def __init__(self, flags, type, track, channel, payload): - if len(payload) > Frame.MAX_PAYLOAD: - raise ValueError("max payload size exceeded: %s" % len(payload)) - self.flags = flags - self.type = type - self.track = track - self.channel = channel - self.payload = payload - - def isFirstSegment(self): - return bool(FIRST_SEG & self.flags) - - def isLastSegment(self): - return bool(LAST_SEG & self.flags) - - def isFirstFrame(self): - return bool(FIRST_FRM & self.flags) - - def isLastFrame(self): - return bool(LAST_FRM & self.flags) - - def __repr__(self): - return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()), - int(self.isLastSegment()), - int(self.isFirstFrame()), - int(self.isLastFrame()), - self.type, - self.track, - self.channel, - self.payload) - class FramingError(Exception): pass class Framer(Packer): @@ -137,24 +95,3 @@ class Framer(Packer): self.flush() finally: self.sock_lock.release() - - def write_frame(self, frame): - self.sock_lock.acquire() - try: - size = len(frame.payload) + struct.calcsize(Frame.HEADER) - track = frame.track & 0x0F - self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel) - self.write(frame.payload) - if frame.isLastSegment() and frame.isLastFrame(): - self.flush() - frm.debug("SENT %s", frame) - finally: - self.sock_lock.release() - - def read_frame(self): - flags, type, size, track, channel = self.unpack(Frame.HEADER) - if flags & 0xF0: raise FramingError() - payload = self.read(size - struct.calcsize(Frame.HEADER)) - frame = Frame(flags, type, track, channel, payload) - frm.debug("RECV %s", frame) - return frame diff --git a/python/qpid/framing.py b/python/qpid/framing.py index 7c5f68fbcc..0a8f26272c 100644 --- a/python/qpid/framing.py +++ b/python/qpid/framing.py @@ -18,8 +18,64 @@ # import struct -from qpid.framer import Frame, FIRST_SEG, LAST_SEG, FIRST_FRM, LAST_FRM -from qpid.assembler import Segment + +FIRST_SEG = 0x08 +LAST_SEG = 0x04 +FIRST_FRM = 0x02 +LAST_FRM = 0x01 + +class Frame: + + HEADER = "!2BHxBH4x" + HEADER_SIZE = struct.calcsize(HEADER) + MAX_PAYLOAD = 65535 - struct.calcsize(HEADER) + + def __init__(self, flags, type, track, channel, payload): + if len(payload) > Frame.MAX_PAYLOAD: + raise ValueError("max payload size exceeded: %s" % len(payload)) + self.flags = flags + self.type = type + self.track = track + self.channel = channel + self.payload = payload + + def isFirstSegment(self): + return bool(FIRST_SEG & self.flags) + + def isLastSegment(self): + return bool(LAST_SEG & self.flags) + + def isFirstFrame(self): + return bool(FIRST_FRM & self.flags) + + def isLastFrame(self): + return bool(LAST_FRM & self.flags) + + def __repr__(self): + return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()), + int(self.isLastSegment()), + int(self.isFirstFrame()), + int(self.isLastFrame()), + self.type, + self.track, + self.channel, + self.payload) + +class Segment: + + def __init__(self, first, last, type, track, channel, payload): + self.id = None + self.offset = None + self.first = first + self.last = last + self.type = type + self.track = track + self.channel = channel + self.payload = payload + + def __repr__(self): + return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type, + self.track, self.channel, self.payload) class FrameDecoder: @@ -140,3 +196,115 @@ class SegmentEncoder: result = self.frames self.frames = [] return result + +from ops import COMMANDS, CONTROLS, COMPOUND, Header, segment_type, track +from spec import SPEC + +from codec010 import StringCodec + +class OpEncoder: + + def __init__(self): + self.segments = [] + + def write(self, *ops): + for op in ops: + if COMMANDS.has_key(op.NAME): + seg_type = segment_type.command + seg_track = track.command + enc = self.encode_command(op) + elif CONTROLS.has_key(op.NAME): + seg_type = segment_type.control + seg_track = track.control + enc = self.encode_compound(op) + else: + raise ValueError(op) + seg = Segment(True, False, seg_type, seg_track, op.channel, enc) + self.segments.append(seg) + if hasattr(op, "headers") and op.headers is not None: + hdrs = "" + for h in op.headers: + hdrs += self.encode_compound(h) + seg = Segment(False, False, segment_type.header, seg_track, op.channel, + hdrs) + self.segments.append(seg) + if hasattr(op, "payload") and op.payload is not None: + self.segments.append(Segment(False, False, segment_type.body, seg_track, + op.channel, op.payload)) + self.segments[-1].last = True + + def encode_command(self, cmd): + sc = StringCodec() + sc.write_uint16(cmd.CODE) + sc.write_compound(Header(sync=cmd.sync)) + sc.write_fields(cmd) + return sc.encoded + + def encode_compound(self, op): + sc = StringCodec() + sc.write_compound(op) + return sc.encoded + + def read(self): + result = self.segments + self.segments = [] + return result + +class OpDecoder: + + def __init__(self): + self.op = None + self.ops = [] + + def write(self, *segments): + for seg in segments: + if seg.first: + if seg.type == segment_type.command: + self.op = self.decode_command(seg.payload) + elif seg.type == segment_type.control: + self.op = self.decode_control(seg.payload) + else: + raise ValueError(seg) + self.op.channel = seg.channel + elif seg.type == segment_type.header: + if self.op.headers is None: + self.op.headers = [] + self.op.headers.extend(self.decode_headers(seg.payload)) + elif seg.type == segment_type.body: + if self.op.payload is None: + self.op.payload = seg.payload + else: + self.op.payload += seg.payload + if seg.last: + self.ops.append(self.op) + self.op = None + + def decode_command(self, encoded): + sc = StringCodec(encoded) + code = sc.read_uint16() + cls = COMMANDS[code] + hdr = sc.read_compound(Header) + cmd = cls() + sc.read_fields(cmd) + cmd.sync = hdr.sync + return cmd + + def decode_control(self, encoded): + sc = StringCodec(encoded) + code = sc.read_uint16() + cls = CONTROLS[code] + ctl = cls() + sc.read_fields(ctl) + return ctl + + def decode_headers(self, encoded): + sc = StringCodec(encoded) + result = [] + while sc.encoded: + result.append(sc.read_struct32()) + return result + + def read(self): + result = self.ops + self.ops = [] + return result diff --git a/python/qpid/generator.py b/python/qpid/generator.py index 729425d6a3..02d11e5005 100644 --- a/python/qpid/generator.py +++ b/python/qpid/generator.py @@ -19,42 +19,38 @@ import sys -from spec010 import Control +from ops import * -def METHOD(module, inst): - method = lambda self, *args, **kwargs: self.invoke(inst, args, kwargs) +def METHOD(module, op): + method = lambda self, *args, **kwargs: self.invoke(op, args, kwargs) if sys.version_info[:2] > (2, 3): - method.__name__ = str(inst.pyname) - method.__doc__ = str(inst.pydoc) + method.__name__ = op.__name__ + method.__doc__ = op.__doc__ method.__module__ = module return method -def generate(spec, module, predicate=lambda x: True): - dict = {"spec": spec} +def generate(module, operations): + dict = {} - for name, enum in spec.enums.items(): - dict[name] = enum + for name, enum in ENUMS.items(): + if isinstance(name, basestring): + dict[name] = enum - for name, st in spec.structs_by_name.items(): - dict[name] = METHOD(module, st) + for name, op in COMPOUND.items(): + if isinstance(name, basestring): + dict[name] = METHOD(module, op) - for st in spec.structs.values(): - dict[st.name] = METHOD(module, st) - - for name, inst in spec.instructions.items(): - if predicate(inst): - dict[name] = METHOD(module, inst) + for name, op in operations.items(): + if isinstance(name, basestring): + dict[name] = METHOD(module, op) return dict -def invoker(name, spec, predicate=lambda x: True): - return type("%s_%s_%s" % (name, spec.major, spec.minor), - (), generate(spec, invoker.__module__, predicate)) +def invoker(name, operations): + return type(name, (), generate(invoker.__module__, operations)) -def command_invoker(spec): - is_command = lambda cmd: cmd.track == spec["track.command"].value - return invoker("CommandInvoker", spec, is_command) +def command_invoker(): + return invoker("CommandInvoker", COMMANDS) -def control_invoker(spec): - is_control = lambda inst: isinstance(inst, Control) - return invoker("ControlInvoker", spec, is_control) +def control_invoker(): + return invoker("ControlInvoker", CONTROLS) diff --git a/python/qpid/harness.py b/python/qpid/harness.py new file mode 100644 index 0000000000..ce48481612 --- /dev/null +++ b/python/qpid/harness.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Skipped(Exception): pass diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index f06ef87709..9b3fecbf9b 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -34,8 +34,8 @@ import connection, time, socket, sys, traceback from codec010 import StringCodec from datatypes import timestamp, uuid4, RangedSet, Message as Message010 from logging import getLogger +from ops import PRIMITIVE from session import Client, INCOMPLETE -from spec import SPEC from threading import Thread, RLock, Condition from util import connect @@ -191,9 +191,12 @@ class Connection(Lockable): try: self._socket = connect(self.host, self.port) except socket.error, e: - raise ConnectError(*e.args) + raise ConnectError(e) self._conn = connection.Connection(self._socket) - self._conn.start() + try: + self._conn.start() + except connection.VersionError, e: + raise ConnectError(e) for ssn in self.sessions.values(): ssn._attach() @@ -263,8 +266,8 @@ FILTER_DEFAULTS = { def delegate(session): class Delegate(Client): - def message_transfer(self, cmd, headers, body): - session._message_transfer(cmd, headers, body) + def message_transfer(self, cmd): + session._message_transfer(cmd) return Delegate class Session(Lockable): @@ -314,9 +317,9 @@ class Session(Lockable): link._disconnected() @synchronized - def _message_transfer(self, cmd, headers, body): - m = Message010(body) - m.headers = headers + def _message_transfer(self, cmd): + m = Message010(cmd.payload) + m.headers = cmd.headers m.id = cmd.id msg = self._decode(m) rcv = self.receivers[int(cmd.destination)] @@ -812,16 +815,16 @@ class Receiver(Lockable): def codec(name): - type = SPEC.named[name] + type = PRIMITIVE[name] def encode(x): - sc = StringCodec(SPEC) - type.encode(sc, x) + sc = StringCodec() + sc.write_primitive(type, x) return sc.encoded def decode(x): - sc = StringCodec(SPEC, x) - return type.decode(sc) + sc = StringCodec(x) + return sc.read_primitive(type) return encode, decode diff --git a/python/qpid/ops.py b/python/qpid/ops.py new file mode 100644 index 0000000000..1f82889164 --- /dev/null +++ b/python/qpid/ops.py @@ -0,0 +1,277 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os, mllib, cPickle as pickle +from util import fill + +class Primitive(object): + pass + +class Enum(object): + pass + +class Field: + + def __init__(self, name, type, default=None): + self.name = name + self.type = type + self.default = default + + def __repr__(self): + return "%s: %s" % (self.name, self.type) + +class Compound(object): + + UNENCODED=[] + + def __init__(self, *args, **kwargs): + args = list(args) + for f in self.ARGS: + if args: + a = args.pop(0) + else: + a = kwargs.pop(f.name, f.default) + setattr(self, f.name, a) + if args: + raise TypeError("%s takes at most %s arguments (%s given))" % + (self.__class__.__name__, len(self.ARGS), + len(self.ARGS) + len(args))) + if kwargs: + raise TypeError("got unexpected keyword argument '%s'" % kwargs.keys()[0]) + + def fields(self): + result = {} + for f in self.FIELDS: + result[f.name] = getattr(self, f.name) + return result + + def args(self): + result = {} + for f in self.ARGS: + result[f.name] = getattr(self, f.name) + return result + + def dispatch(self, target, *args): + handler = "do_%s" % self.NAME + if hasattr(target, handler): + getattr(target, handler)(self, *args) + else: + print "UNHANDLED:", target, args + + def __repr__(self, extras=()): + return "%s(%s)" % (self.__class__.__name__, + ", ".join(["%s=%r" % (f.name, getattr(self, f.name)) + for f in self.ARGS + if getattr(self, f.name) is not f.default])) + +class Command(Compound): + UNENCODED=[Field("channel", "uint16", 0), + Field("id", "sequence-no", None), + Field("sync", "bit", False), + Field("headers", None, None), + Field("payload", None, None)] + +class Control(Compound): + UNENCODED=[Field("channel", "uint16", 0)] + +def pythonize(st): + if st is None: + return None + else: + return str(st.replace("-", "_")) + +def pydoc(op, children=()): + doc = "\n\n".join([fill(p.text(), 0) for p in op.query["doc"]]) + for ch in children: + doc += "\n\n " + pythonize(ch["@name"]) + " -- " + str(ch["@label"]) + ch_descs ="\n\n".join([fill(p.text(), 4) for p in ch.query["doc"]]) + if ch_descs: + doc += "\n\n" + ch_descs + return doc + +def studly(st): + return "".join([p.capitalize() for p in st.split("-")]) + +def klass(nd): + while nd.parent is not None: + if hasattr(nd.parent, "name") and nd.parent.name == "class": + return nd.parent + else: + nd = nd.parent + +def included(nd): + cls = klass(nd) + if cls is None: + return True + else: + return cls["@name"] not in ("file", "stream") + +def num(s): + if s: return int(s, 0) + +def code(nd): + c = num(nd["@code"]) + if c is None: + return None + else: + cls = klass(nd) + if cls is None: + return c + else: + return c | (num(cls["@code"]) << 8) + +def default(f): + if f["@type"] == "bit": + return False + else: + return None + +def make_compound(decl, base): + dict = {} + fields = decl.query["field"] + dict["__doc__"] = pydoc(decl, fields) + dict["NAME"] = pythonize(decl["@name"]) + dict["SIZE"] = num(decl["@size"]) + dict["CODE"] = code(decl) + dict["PACK"] = num(decl["@pack"]) + dict["FIELDS"] = [Field(pythonize(f["@name"]), resolve(f), default(f)) for f in fields] + dict["ARGS"] = dict["FIELDS"] + base.UNENCODED + return str(studly(decl["@name"])), (base,), dict + +def make_restricted(decl): + name = pythonize(decl["@name"]) + dict = {} + choices = decl.query["choice"] + dict["__doc__"] = pydoc(decl, choices) + dict["NAME"] = name + dict["TYPE"] = str(decl.parent["@type"]) + values = [] + for ch in choices: + val = int(ch["@value"], 0) + dict[pythonize(ch["@name"])] = val + values.append(val) + dict["VALUES"] = values + return name, (Enum,), dict + +def make_type(decl): + name = pythonize(decl["@name"]) + dict = {} + dict["__doc__"] = pydoc(decl) + dict["NAME"] = name + dict["CODE"] = code(decl) + return str(studly(decl["@name"])), (Primitive,), dict + +def make_command(decl): + decl.set_attr("name", "%s-%s" % (decl.parent["@name"], decl["@name"])) + decl.set_attr("size", "0") + decl.set_attr("pack", "2") + name, bases, dict = make_compound(decl, Command) + dict["RESULT"] = pythonize(decl["result/@type"]) or pythonize(decl["result/struct/@name"]) + return name, bases, dict + +def make_control(decl): + decl.set_attr("name", "%s-%s" % (decl.parent["@name"], decl["@name"])) + decl.set_attr("size", "0") + decl.set_attr("pack", "2") + return make_compound(decl, Control) + +def make_struct(decl): + return make_compound(decl, Compound) + +def make_enum(decl): + decl.set_attr("name", decl.parent["@name"]) + return make_restricted(decl) + + +vars = globals() + +def make(nd): + return vars["make_%s" % nd.name](nd) + +from qpid_config import amqp_spec as file +pclfile = "%s.ops.pcl" % file + +if False and (os.path.exists(pclfile) and + os.path.getmtime(pclfile) > os.path.getmtime(file)): + f = open(pclfile, "read") + types = pickle.load(f) + f.close() +else: + spec = mllib.xml_parse(file) + + def qualify(nd, field="@name"): + cls = klass(nd) + if cls is None: + return pythonize(nd[field]) + else: + return pythonize("%s.%s" % (cls["@name"], nd[field])) + + domains = dict([(qualify(d), pythonize(d["@type"])) + for d in spec.query["amqp/domain", included] + \ + spec.query["amqp/class/domain", included]]) + + def resolve(nd): + candidates = qualify(nd, "@type"), pythonize(nd["@type"]) + for c in candidates: + if domains.has_key(c): + while domains.has_key(c): + c = domains[c] + return c + else: + return c + + type_decls = \ + spec.query["amqp/class/command", included] + \ + spec.query["amqp/class/control", included] + \ + spec.query["amqp/class/command/result/struct", included] + \ + spec.query["amqp/class/struct", included] + \ + spec.query["amqp/class/domain/enum", included] + \ + spec.query["amqp/domain/enum", included] + \ + spec.query["amqp/type"] + types = [make(nd) for nd in type_decls] + + if os.access(os.path.dirname(os.path.abspath(pclfile)), os.W_OK): + f = open(pclfile, "write") + pickle.dump(types, f) + f.close() + +ENUMS = {} +PRIMITIVE = {} +COMPOUND = {} +COMMANDS = {} +CONTROLS = {} + +for name, bases, dict in types: + t = type(name, bases, dict) + vars[name] = t + + if issubclass(t, Command): + COMMANDS[t.NAME] = t + COMMANDS[t.CODE] = t + elif issubclass(t, Control): + CONTROLS[t.NAME] = t + CONTROLS[t.CODE] = t + elif issubclass(t, Compound): + COMPOUND[t.NAME] = t + if t.CODE is not None: + COMPOUND[t.CODE] = t + elif issubclass(t, Primitive): + PRIMITIVE[t.NAME] = t + PRIMITIVE[t.CODE] = t + elif issubclass(t, Enum): + ENUMS[t.NAME] = t diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 18d7848b8d..2bc9844351 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -25,7 +25,7 @@ incoming method frames to a delegate. """ import thread, threading, traceback, socket, sys, logging -from connection08 import EOF, Method, Header, Body, Request, Response +from connection08 import EOF, Method, Header, Body, Request, Response, VersionError from message import Message from queue import Queue, Closed as QueueClosed from content import Content @@ -95,6 +95,8 @@ class Peer: break ch = self.channel(frame.channel) ch.receive(frame, self.work) + except VersionError, e: + self.closed(e) except: self.fatal() diff --git a/python/qpid/session.py b/python/qpid/session.py index 3b8bd18469..4413a22899 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -22,9 +22,9 @@ from spec import SPEC from generator import command_invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec -from assembler import Segment from queue import Queue from datatypes import Message, serial +from ops import Command, MessageTransfer from util import wait, notify from exceptions import * from logging import getLogger @@ -44,7 +44,7 @@ def server(*args): INCOMPLETE = object() -class Session(command_invoker(SPEC)): +class Session(command_invoker()): def __init__(self, name, auto_sync=True, timeout=10, delegate=client): self.name = name @@ -67,8 +67,6 @@ class Session(command_invoker(SPEC)): self.results = {} self.exceptions = [] - self.assembly = None - self.delegate = delegate(self) def incoming(self, destination): @@ -134,68 +132,51 @@ class Session(command_invoker(SPEC)): finally: self.lock.release() - def invoke(self, type, args, kwargs): - # XXX - if not hasattr(type, "track"): - return type.new(args, kwargs) - - self.invoke_lock.acquire() - try: - return self.do_invoke(type, args, kwargs) - finally: - self.invoke_lock.release() + def invoke(self, op, args, kwargs): + if issubclass(op, Command): + self.invoke_lock.acquire() + try: + return self.do_invoke(op, args, kwargs) + finally: + self.invoke_lock.release() + else: + return op(*args, **kwargs) - def do_invoke(self, type, args, kwargs): + def do_invoke(self, op, args, kwargs): if self._closing: raise SessionClosed() if self.channel == None: raise SessionDetached() - if type.segments: - if len(args) == len(type.fields) + 1: + if op == MessageTransfer: + if len(args) == len(op.FIELDS) + 1: message = args[-1] args = args[:-1] else: message = kwargs.pop("message", None) - else: - message = None - - hdr = Struct(self.spec["session.header"]) - hdr.sync = self.auto_sync or kwargs.pop("sync", False) - self.need_sync = not hdr.sync + if message is not None: + kwargs["headers"] = message.headers + kwargs["payload"] = message.body - cmd = type.new(args, kwargs) - sc = StringCodec(self.spec) - sc.write_command(hdr, cmd) + cmd = op(*args, **kwargs) + cmd.sync = self.auto_sync or cmd.sync + self.need_sync = not cmd.sync + cmd.channel = self.channel.id - seg = Segment(True, (message == None or - (message.headers == None and message.body == None)), - type.segment_type, type.track, self.channel.id, sc.encoded) - - if type.result: + if op.RESULT: result = Future(exception=SessionException) self.results[self.sender.next_id] = result - self.send(seg) - - log.debug("SENT %s %s %s", seg.id, hdr, cmd) - - if message != None: - if message.headers != None: - sc = StringCodec(self.spec) - for st in message.headers: - sc.write_struct32(st) - seg = Segment(False, message.body == None, self.spec["segment_type.header"].value, - type.track, self.channel.id, sc.encoded) - self.send(seg) - if message.body != None: - seg = Segment(False, True, self.spec["segment_type.body"].value, - type.track, self.channel.id, message.body) - self.send(seg) - msg.debug("SENT %s", message) - - if type.result: + log.debug("SENDING %s", cmd) + + self.send(cmd) + + log.debug("SENT %s", cmd) + if op == MessageTransfer: + msg.debug("SENT %s", cmd) + + if op.RESULT: if self.auto_sync: return result.get(self.timeout) else: @@ -203,81 +184,47 @@ class Session(command_invoker(SPEC)): elif self.auto_sync: self.sync(self.timeout) - def received(self, seg): - self.receiver.received(seg) - if seg.first: - assert self.assembly == None - self.assembly = [] - self.assembly.append(seg) - if seg.last: - self.dispatch(self.assembly) - self.assembly = None - - def dispatch(self, assembly): - segments = assembly[:] - - hdr, cmd = assembly.pop(0).decode(self.spec) - log.debug("RECV %s %s %s", cmd.id, hdr, cmd) - - args = [] - - for st in cmd._type.segments: - if assembly: - seg = assembly[0] - if seg.type == st.segment_type: - args.append(seg.decode(self.spec)) - assembly.pop(0) - continue - args.append(None) - - assert len(assembly) == 0 + def received(self, cmd): + self.receiver.received(cmd) + self.dispatch(cmd) - attr = cmd._type.qname.replace(".", "_") - result = getattr(self.delegate, attr)(cmd, *args) + def dispatch(self, cmd): + log.debug("RECV %s", cmd) - if cmd._type.result: + result = getattr(self.delegate, cmd.NAME)(cmd) + if result is INCOMPLETE: + return + elif result is not None: self.execution_result(cmd.id, result) - if result is not INCOMPLETE: - for seg in segments: - self.receiver.completed(seg) - # XXX: don't forget to obey sync for manual completion as well - if hdr.sync: - self.channel.session_completed(self.receiver._completed) + self.receiver.completed(cmd) + # XXX: don't forget to obey sync for manual completion as well + if cmd.sync: + self.channel.session_completed(self.receiver._completed) - def send(self, seg): - self.sender.send(seg) - - def __str__(self): - return '<Session: %s, %s>' % (self.name, self.channel) + def send(self, cmd): + self.sender.send(cmd) def __repr__(self): - return str(self) + return '<Session: %s, %s>' % (self.name, self.channel) class Receiver: def __init__(self, session): self.session = session self.next_id = None - self.next_offset = None self._completed = RangedSet() - def received(self, seg): - if self.next_id == None or self.next_offset == None: + def received(self, cmd): + if self.next_id == None: raise Exception("todo") - seg.id = self.next_id - seg.offset = self.next_offset - if seg.last: - self.next_id += 1 - self.next_offset = 0 - else: - self.next_offset += len(seg.payload) + cmd.id = self.next_id + self.next_id += 1 - def completed(self, seg): - if seg.id == None: - raise ValueError("cannot complete unidentified segment") - if seg.last: - self._completed.add(seg.id) + def completed(self, cmd): + if cmd.id == None: + raise ValueError("cannot complete unidentified command") + self._completed.add(cmd.id) def known_completed(self, commands): completed = RangedSet() @@ -294,30 +241,24 @@ class Sender: def __init__(self, session): self.session = session self.next_id = serial(0) - self.next_offset = 0 - self.segments = [] + self.commands = [] self._completed = RangedSet() - def send(self, seg): - seg.id = self.next_id - seg.offset = self.next_offset - if seg.last: - self.next_id += 1 - self.next_offset = 0 - else: - self.next_offset += len(seg.payload) - self.segments.append(seg) + def send(self, cmd): + cmd.id = self.next_id + self.next_id += 1 if self.session.send_id: self.session.send_id = False - self.session.channel.session_command_point(seg.id, seg.offset) - self.session.channel.connection.write_segment(seg) + self.session.channel.session_command_point(cmd.id, 0) + self.commands.append(cmd) + self.session.channel.connection.write_op(cmd) def completed(self, commands): idx = 0 - while idx < len(self.segments): - seg = self.segments[idx] - if seg.id in commands: - del self.segments[idx] + while idx < len(self.commands): + cmd = self.commands[idx] + if cmd.id in commands: + del self.commands[idx] else: idx += 1 for range in commands.ranges: @@ -332,7 +273,7 @@ class Incoming(Queue): def start(self): self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit) - for unit in self.session.credit_unit.values(): + for unit in self.session.credit_unit.VALUES: self.session.message_flow(self.destination, unit, 0xFFFFFFFFL) def stop(self): @@ -356,9 +297,9 @@ class Delegate: class Client(Delegate): - def message_transfer(self, cmd, headers, body): - m = Message(body) - m.headers = headers + def message_transfer(self, cmd): + m = Message(cmd.payload) + m.headers = cmd.headers m.id = cmd.id messages = self.session.incoming(cmd.destination) messages.put(m) diff --git a/python/qpid/spec.py b/python/qpid/spec.py index cd76c70c5c..e9bfef1fa6 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -29,7 +29,7 @@ class so that the generated code can be reused in a variety of situations. """ -import os, mllib, spec08, spec010 +import os, mllib, spec08 def default(): try: @@ -54,7 +54,7 @@ def load(specfile, *errata): minor = doc["amqp/@minor"] if major == "0" and minor == "10": - return spec010.load(specfile, *errata) + return None else: return spec08.load(specfile, *errata) diff --git a/python/qpid/spec010.py b/python/qpid/spec010.py deleted file mode 100644 index eabc8e2983..0000000000 --- a/python/qpid/spec010.py +++ /dev/null @@ -1,708 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, cPickle, datatypes, datetime -from codec010 import StringCodec -from util import mtime, fill - -class Node: - - def __init__(self, children): - self.children = children - self.named = {} - self.docs = [] - self.rules = [] - - def register(self): - for ch in self.children: - ch.register(self) - - def resolve(self): - for ch in self.children: - ch.resolve() - - def __getitem__(self, name): - path = name.split(".", 1) - nd = self.named - for step in path: - nd = nd[step] - return nd - - def __iter__(self): - return iter(self.children) - -class Anonymous: - - def __init__(self, children): - self.children = children - - def register(self, node): - for ch in self.children: - ch.register(node) - - def resolve(self): - for ch in self.children: - ch.resolve() - -class Named: - - def __init__(self, name): - self.name = name - self.qname = None - - def register(self, node): - self.spec = node.spec - self.klass = node.klass - node.named[self.name] = self - if node.qname: - self.qname = "%s.%s" % (node.qname, self.name) - else: - self.qname = self.name - - def __str__(self): - return self.qname - - def __repr__(self): - return str(self) - -class Lookup: - - def lookup(self, name): - value = None - if self.klass: - try: - value = self.klass[name] - except KeyError: - pass - if not value: - value = self.spec[name] - return value - -class Coded: - - def __init__(self, code): - self.code = code - -class Constant(Named, Node): - - def __init__(self, name, value, children): - Named.__init__(self, name) - Node.__init__(self, children) - self.value = value - - def register(self, node): - Named.register(self, node) - node.constants.append(self) - Node.register(self) - -class Type(Named, Node): - - def __init__(self, name, children): - Named.__init__(self, name) - Node.__init__(self, children) - - def is_present(self, value): - return value != None - - def register(self, node): - Named.register(self, node) - Node.register(self) - -class Primitive(Coded, Type): - - def __init__(self, name, code, fixed, variable, children): - Coded.__init__(self, code) - Type.__init__(self, name, children) - self.fixed = fixed - self.variable = variable - - def register(self, node): - Type.register(self, node) - if self.code is not None: - self.spec.types[self.code] = self - - def is_present(self, value): - if self.fixed == 0: - return value - else: - return Type.is_present(self, value) - - def encode(self, codec, value): - getattr(codec, "write_%s" % self.name)(value) - - def decode(self, codec): - return getattr(codec, "read_%s" % self.name)() - -class Domain(Type, Lookup): - - def __init__(self, name, type, children): - Type.__init__(self, name, children) - self.type = type - self.choices = {} - - def resolve(self): - self.type = self.lookup(self.type) - Node.resolve(self) - - def encode(self, codec, value): - self.type.encode(codec, value) - - def decode(self, codec): - return self.type.decode(codec) - -class Enum: - - def __init__(self, name): - self.name = name - self._names = () - self._values = () - - def values(self): - return self._values - - def __repr__(self): - return "%s(%s)" % (self.name, ", ".join(self._names)) - -class Choice(Named, Node): - - def __init__(self, name, value, children): - Named.__init__(self, name) - Node.__init__(self, children) - self.value = value - - def register(self, node): - Named.register(self, node) - node.choices[self.value] = self - Node.register(self) - try: - enum = node.spec.enums[node.name] - except KeyError: - enum = Enum(node.name) - node.spec.enums[node.name] = enum - setattr(enum, self.name, self.value) - enum._names += (self.name,) - enum._values += (self.value,) - -class Composite(Type, Coded): - - def __init__(self, name, label, code, size, pack, children): - Coded.__init__(self, code) - Type.__init__(self, name, children) - self.label = label - self.fields = [] - self.size = size - self.pack = pack - - def new(self, args, kwargs): - return datatypes.Struct(self, *args, **kwargs) - - def decode(self, codec): - codec.read_size(self.size) - if self.code is not None: - code = codec.read_uint16() - assert self.code == code - return datatypes.Struct(self, **self.decode_fields(codec)) - - def decode_fields(self, codec): - flags = 0 - for i in range(self.pack): - flags |= (codec.read_uint8() << 8*i) - - result = {} - - for i in range(len(self.fields)): - f = self.fields[i] - if flags & (0x1 << i): - result[f.name] = f.type.decode(codec) - else: - result[f.name] = None - return result - - def encode(self, codec, value): - sc = StringCodec(self.spec) - if self.code is not None: - sc.write_uint16(self.code) - self.encode_fields(sc, value) - codec.write_size(self.size, len(sc.encoded)) - codec.write(sc.encoded) - - def encode_fields(self, codec, values): - flags = 0 - for i in range(len(self.fields)): - f = self.fields[i] - if f.type.is_present(values[f.name]): - flags |= (0x1 << i) - for i in range(self.pack): - codec.write_uint8((flags >> 8*i) & 0xFF) - for i in range(len(self.fields)): - f = self.fields[i] - if flags & (0x1 << i): - f.type.encode(codec, values[f.name]) - - def docstring(self): - docs = [] - if self.label: - docs.append(self.label) - docs += [d.text for d in self.docs] - s = "\n\n".join([fill(t, 2) for t in docs]) - for f in self.fields: - fdocs = [] - if f.label: - fdocs.append(f.label) - else: - fdocs.append("") - fdocs += [d.text for d in f.docs] - s += "\n\n" + "\n\n".join([fill(fdocs[0], 4, f.name)] + - [fill(t, 4) for t in fdocs[1:]]) - return s - - -class Field(Named, Node, Lookup): - - def __init__(self, name, label, type, children): - Named.__init__(self, name) - Node.__init__(self, children) - self.label = label - self.type = type - self.exceptions = [] - - def default(self): - return None - - def register(self, node): - Named.register(self, node) - node.fields.append(self) - Node.register(self) - - def resolve(self): - self.type = self.lookup(self.type) - Node.resolve(self) - - def __str__(self): - return "%s: %s" % (self.qname, self.type.qname) - -class Struct(Composite): - - def register(self, node): - Composite.register(self, node) - if self.code is not None: - self.spec.structs[self.code] = self - self.spec.structs_by_name[self.name] = self - self.pyname = self.name - self.pydoc = self.docstring() - - def __str__(self): - fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname) - for f in self.fields]) - return "%s {\n %s\n}" % (self.qname, fields) - -class Segment: - - def __init__(self): - self.segment_type = None - - def register(self, node): - self.spec = node.spec - self.klass = node.klass - node.segments.append(self) - Node.register(self) - -class Instruction(Composite, Segment): - - def __init__(self, name, label, code, children): - Composite.__init__(self, name, label, code, 0, 2, children) - Segment.__init__(self) - self.track = None - self.handlers = [] - - def __str__(self): - return "%s(%s)" % (self.qname, ", ".join(["%s: %s" % (f.name, f.type.qname) - for f in self.fields])) - - def register(self, node): - Composite.register(self, node) - self.pyname = self.qname.replace(".", "_") - self.pydoc = self.docstring() - self.spec.instructions[self.pyname] = self - -class Control(Instruction): - - def __init__(self, name, code, label, children): - Instruction.__init__(self, name, code, label, children) - self.response = None - - def register(self, node): - Instruction.register(self, node) - node.controls.append(self) - self.spec.controls[self.code] = self - self.segment_type = self.spec["segment_type.control"].value - self.track = self.spec["track.control"].value - -class Command(Instruction): - - def __init__(self, name, label, code, children): - Instruction.__init__(self, name, label, code, children) - self.result = None - self.exceptions = [] - self.segments = [] - - def register(self, node): - Instruction.register(self, node) - node.commands.append(self) - self.spec.commands[self.code] = self - self.segment_type = self.spec["segment_type.command"].value - self.track = self.spec["track.command"].value - -class Header(Segment, Node): - - def __init__(self, children): - Segment.__init__(self) - Node.__init__(self, children) - self.entries = [] - - def register(self, node): - Segment.register(self, node) - self.segment_type = self.spec["segment_type.header"].value - Node.register(self) - -class Entry(Lookup): - - def __init__(self, type): - self.type = type - - def register(self, node): - self.spec = node.spec - self.klass = node.klass - node.entries.append(self) - - def resolve(self): - self.type = self.lookup(self.type) - -class Body(Segment, Node): - - def __init__(self, children): - Segment.__init__(self) - Node.__init__(self, children) - - def register(self, node): - Segment.register(self, node) - self.segment_type = self.spec["segment_type.body"].value - Node.register(self) - - def resolve(self): pass - -class Class(Named, Coded, Node): - - def __init__(self, name, code, children): - Named.__init__(self, name) - Coded.__init__(self, code) - Node.__init__(self, children) - self.controls = [] - self.commands = [] - - def register(self, node): - Named.register(self, node) - self.klass = self - node.classes.append(self) - Node.register(self) - -class Doc: - - def __init__(self, type, title, text): - self.type = type - self.title = title - self.text = text - - def register(self, node): - node.docs.append(self) - - def resolve(self): pass - -class Role(Named, Node): - - def __init__(self, name, children): - Named.__init__(self, name) - Node.__init__(self, children) - - def register(self, node): - Named.register(self, node) - Node.register(self) - -class Rule(Named, Node): - - def __init__(self, name, children): - Named.__init__(self, name) - Node.__init__(self, children) - - def register(self, node): - Named.register(self, node) - node.rules.append(self) - Node.register(self) - -class Exception(Named, Node): - - def __init__(self, name, error_code, children): - Named.__init__(self, name) - Node.__init__(self, children) - self.error_code = error_code - - def register(self, node): - Named.register(self, node) - node.exceptions.append(self) - Node.register(self) - -def direct(t): - return lambda x: t - -def map_str(s): - for c in s: - if ord(c) >= 0x80: - return "vbin16" - return "str16" - -class Spec(Node): - - ENCODINGS = { - unicode: direct("str16"), - str: map_str, - buffer: direct("vbin32"), - int: direct("int64"), - long: direct("int64"), - float: direct("double"), - None.__class__: direct("void"), - list: direct("list"), - tuple: direct("list"), - dict: direct("map"), - datatypes.timestamp: direct("datetime"), - datetime.datetime: direct("datetime"), - datatypes.UUID: direct("uuid") - } - - def __init__(self, major, minor, port, children): - Node.__init__(self, children) - self.major = major - self.minor = minor - self.port = port - self.constants = [] - self.classes = [] - self.types = {} - self.qname = None - self.spec = self - self.klass = None - self.instructions = {} - self.controls = {} - self.commands = {} - self.structs = {} - self.structs_by_name = {} - self.enums = {} - - def encoding(self, obj): - return self._encoding(obj.__class__, obj) - - def _encoding(self, klass, obj): - if Spec.ENCODINGS.has_key(klass): - return self.named[Spec.ENCODINGS[klass](obj)] - for base in klass.__bases__: - result = self._encoding(base, obj) - if result != None: - return result - -class Implement: - - def __init__(self, handle): - self.handle = handle - - def register(self, node): - node.handlers.append(self.handle) - - def resolve(self): pass - -class Response(Node): - - def __init__(self, name, children): - Node.__init__(self, children) - self.name = name - - def register(self, node): - Node.register(self) - -class Result(Node, Lookup): - - def __init__(self, type, children): - self.type = type - Node.__init__(self, children) - - def register(self, node): - node.result = self - self.qname = node.qname - self.klass = node.klass - self.spec = node.spec - Node.register(self) - - def resolve(self): - self.type = self.lookup(self.type) - Node.resolve(self) - -import mllib - -def num(s): - if s: return int(s, 0) - -REPLACE = {" ": "_", "-": "_"} -KEYWORDS = {"global": "global_", - "return": "return_"} - -def id(name): - name = str(name) - for key, val in REPLACE.items(): - name = name.replace(key, val) - try: - name = KEYWORDS[name] - except KeyError: - pass - return name - -class Loader: - - def __init__(self): - self.class_code = 0 - - def code(self, nd): - c = num(nd["@code"]) - if c is None: - return None - else: - return c | (self.class_code << 8) - - def list(self, q): - result = [] - for nd in q: - result.append(nd.dispatch(self)) - return result - - def children(self, n): - return self.list(n.query["#tag"]) - - def data(self, d): - return d.data - - def do_amqp(self, a): - return Spec(num(a["@major"]), num(a["@minor"]), num(a["@port"]), - self.children(a)) - - def do_type(self, t): - return Primitive(id(t["@name"]), self.code(t), num(t["@fixed-width"]), - num(t["@variable-width"]), self.children(t)) - - def do_constant(self, c): - return Constant(id(c["@name"]), num(c["@value"]), self.children(c)) - - def do_domain(self, d): - return Domain(id(d["@name"]), id(d["@type"]), self.children(d)) - - def do_enum(self, e): - return Anonymous(self.children(e)) - - def do_choice(self, c): - return Choice(id(c["@name"]), num(c["@value"]), self.children(c)) - - def do_class(self, c): - code = num(c["@code"]) - self.class_code = code - children = self.children(c) - children += self.list(c.query["command/result/struct"]) - self.class_code = 0 - return Class(id(c["@name"]), code, children) - - def do_doc(self, doc): - text = reduce(lambda x, y: x + y, self.list(doc.children)) - return Doc(doc["@type"], doc["@title"], text) - - def do_xref(self, x): - return x["@ref"] - - def do_role(self, r): - return Role(id(r["@name"]), self.children(r)) - - def do_control(self, c): - return Control(id(c["@name"]), c["@label"], self.code(c), self.children(c)) - - def do_rule(self, r): - return Rule(id(r["@name"]), self.children(r)) - - def do_implement(self, i): - return Implement(id(i["@handle"])) - - def do_response(self, r): - return Response(id(r["@name"]), self.children(r)) - - def do_field(self, f): - return Field(id(f["@name"]), f["@label"], id(f["@type"]), self.children(f)) - - def do_struct(self, s): - return Struct(id(s["@name"]), s["@label"], self.code(s), num(s["@size"]), - num(s["@pack"]), self.children(s)) - - def do_command(self, c): - return Command(id(c["@name"]), c["@label"], self.code(c), self.children(c)) - - def do_segments(self, s): - return Anonymous(self.children(s)) - - def do_header(self, h): - return Header(self.children(h)) - - def do_entry(self, e): - return Entry(id(e["@type"])) - - def do_body(self, b): - return Body(self.children(b)) - - def do_result(self, r): - type = r["@type"] - if not type: - type = r["struct/@name"] - return Result(id(type), self.list(r.query["#tag", lambda x: x.name != "struct"])) - - def do_exception(self, e): - return Exception(id(e["@name"]), id(e["@error-code"]), self.children(e)) - -def load(xml): - fname = xml + ".pcl" - - if os.path.exists(fname) and mtime(fname) > mtime(__file__): - file = open(fname, "r") - s = cPickle.load(file) - file.close() - else: - doc = mllib.xml_parse(xml) - s = doc["amqp"].dispatch(Loader()) - s.register() - s.resolve() - - try: - file = open(fname, "w") - except IOError: - file = None - - if file: - cPickle.dump(s, file) - file.close() - - return s diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 12b2781561..31c376b2f9 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -21,239 +21,13 @@ # Support library for qpid python tests. # -import sys, re, unittest, os, random, logging, traceback -import qpid.client, qpid.spec, qmf.console +import unittest, traceback, socket +import qpid.client, qmf.console import Queue -from fnmatch import fnmatch -from getopt import getopt, GetoptError from qpid.content import Content from qpid.message import Message - -#0-10 support -from qpid.connection import Connection -from qpid.spec010 import load -from qpid.util import connect, ssl, URL - -def findmodules(root): - """Find potential python modules under directory root""" - found = [] - for dirpath, subdirs, files in os.walk(root): - modpath = dirpath.replace(os.sep, '.') - if not re.match(r'\.svn$', dirpath): # Avoid SVN directories - for f in files: - match = re.match(r'(.+)\.py$', f) - if match and f != '__init__.py': - found.append('.'.join([modpath, match.group(1)])) - return found - -def default(value, default): - if (value == None): return default - else: return value - -class TestRunner: - - SPEC_FOLDER = "../specs" - qpidd = os.getenv("QPIDD") - - """Runs unit tests. - - Parses command line arguments, provides utility functions for tests, - runs the selected test suite. - """ - - def _die(self, message = None): - if message: print message - print """ -run-tests [options] [test*] -The name of a test is package.module.ClassName.testMethod -Options: - -?/-h/--help : this message - -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations: - 0-8 - use the default 0-8 specification. - 0-9 - use the default 0-9 specification. - 0-10-errata - use the 0-10 specification with qpid errata. - -e/--errata <errata.xml> : file containing amqp XML errata - -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to - -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD - env to point to broker executable. broker-args will be - prepended with "--daemon --port=0" - -v/--verbose : verbose - lists tests as they are run. - -d/--debug : enable debug logging. - -i/--ignore <test> : ignore the named test. - -I/--ignore-file : file containing patterns to ignore. - -S/--skip-self-test : skips the client self tests in the 'tests folder' - -F/--spec-folder : folder that contains the specs to be loaded - """ - sys.exit(1) - - def startBroker(self, brokerArgs): - """Start a single broker daemon""" - if TestRunner.qpidd == None: - self._die("QPIDD environment var must point to qpidd when using -B/--start-broker") - cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs) - portStr = os.popen(cmd).read() - if len(portStr) == 0: - self._die("%s failed to start" % TestRunner.qpidd) - port = int(portStr) - pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read()) - print "Started broker: pid=%d, port=%d" % (pid, port) - self.brokerTuple = (pid, port) - self.setBroker("localhost:%d" % port) - - def stopBroker(self): - """Stop the broker using qpidd -q""" - if self.brokerTuple: - ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q") - if ret != 0: - self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret)) - print "Stopped broker: pid=%d, port=%d" % self.brokerTuple - - def killBroker(self): - """Kill the broker using kill -9 (SIGTERM)""" - if self.brokerTuple: - os.kill(self.brokerTuple[0], signal.SIGTERM) - print "Killed broker: pid=%d, port=%d" % self.brokerTuple - - def setBroker(self, broker): - try: - self.url = URL(broker) - except ValueError: - self._die("'%s' is not a valid broker" % (broker)) - self.user = default(self.url.user, "guest") - self.password = default(self.url.password, "guest") - self.host = self.url.host - if self.url.scheme == URL.AMQPS: - self.ssl = True - default_port = 5671 - else: - self.ssl = False - default_port = 5672 - self.port = default(self.url.port, default_port) - - def ignoreFile(self, filename): - f = file(filename) - for line in f.readlines(): self.ignore.append(line.strip()) - f.close() - - def use08spec(self): - "True if we are running with the old 0-8 spec." - # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons. - return self.spec.major==8 and self.spec.minor==0 - - def use09spec(self): - "True if we are running with the 0-9 (non-wip) spec." - return self.spec.major==0 and self.spec.minor==9 - - def _parseargs(self, args): - # Defaults - self.setBroker("localhost") - self.verbose = 1 - self.ignore = [] - self.specfile = "0-8" - self.errata = [] - self.skip_self_test = False - - try: - opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:", - ["help", "spec", "errata=", "broker=", - "start-broker=", "verbose", "skip-self-test", "ignore", - "ignore-file", "spec-folder"]) - except GetoptError, e: - self._die(str(e)) - # check for mutually exclusive options - if "-B" in opts or "--start-broker" in opts: - if "-b" in opts or "--broker" in opts: - self._die("Cannot use -B/--start-broker and -b/broker options together") - for opt, value in opts: - if opt in ("-?", "-h", "--help"): self._die() - if opt in ("-s", "--spec"): self.specfile = value - if opt in ("-e", "--errata"): self.errata.append(value) - if opt in ("-b", "--broker"): self.setBroker(value) - if opt in ("-B", "--start-broker"): self.startBroker(value) - if opt in ("-v", "--verbose"): self.verbose = 2 - if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) - if opt in ("-i", "--ignore"): self.ignore.append(value) - if opt in ("-I", "--ignore-file"): self.ignoreFile(value) - if opt in ("-S", "--skip-self-test"): self.skip_self_test = True - if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value - - # Abbreviations for default settings. - if (self.specfile == "0-10"): - self.spec = load(self.get_spec_file("amqp.0-10.xml")) - elif (self.specfile == "0-10-errata"): - self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml")) - else: - if (self.specfile == "0-8"): - self.specfile = self.get_spec_file("amqp.0-8.xml") - elif (self.specfile == "0-9"): - self.specfile = self.get_spec_file("amqp.0-9.xml") - self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) - - if (self.specfile == None): - self._die("No XML specification provided") - print "Using specification from:", self.specfile - - self.spec = qpid.spec.load(self.specfile, *self.errata) - - if len(self.tests) == 0: - if not self.skip_self_test: - self.tests=findmodules("tests") - if self.use08spec() or self.use09spec(): - self.tests+=findmodules("tests_0-8") - elif (self.spec.major == 99 and self.spec.minor == 0): - self.tests+=findmodules("tests_0-10_preview") - elif (self.spec.major == 0 and self.spec.minor == 10): - self.tests+=findmodules("tests_0-10") - - def testSuite(self): - class IgnoringTestSuite(unittest.TestSuite): - def addTest(self, test): - if isinstance(test, unittest.TestCase): - for pattern in testrunner.ignore: - if fnmatch(test.id(), pattern): - return - unittest.TestSuite.addTest(self, test) - - # Use our IgnoringTestSuite in the test loader. - unittest.TestLoader.suiteClass = IgnoringTestSuite - return unittest.defaultTestLoader.loadTestsFromNames(self.tests) - - def run(self, args=sys.argv[1:]): - self.brokerTuple = None - self._parseargs(args) - runner = unittest.TextTestRunner(descriptions=False, - verbosity=self.verbose) - result = runner.run(self.testSuite()) - - if (self.ignore): - print "=======================================" - print "NOTE: the following tests were ignored:" - for t in self.ignore: print t - print "=======================================" - - self.stopBroker() - return result.wasSuccessful() - - def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): - """Connect to the broker, returns a qpid.client.Client""" - host = host or self.host - port = port or self.port - spec = spec or self.spec - user = user or self.user - password = password or self.password - client = qpid.client.Client(host, port, spec) - if self.use08spec(): - client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) - else: - client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) - return client - - def get_spec_file(self, fname): - return TestRunner.SPEC_FOLDER + os.sep + fname - -# Global instance for tests to call connect. -testrunner = TestRunner() - +from qpid.harness import Skipped +from qpid.exceptions import VersionError class TestBase(unittest.TestCase): """Base class for Qpid test cases. @@ -267,6 +41,9 @@ class TestBase(unittest.TestCase): resources to clean up later. """ + def configure(self, config): + self.config = config + def setUp(self): self.queues = [] self.exchanges = [] @@ -293,9 +70,26 @@ class TestBase(unittest.TestCase): else: self.client.close() - def connect(self, *args, **keys): + def connect(self, host=None, port=None, user=None, password=None, tune_params=None): """Create a new connction, return the Client object""" - return testrunner.connect(*args, **keys) + host = host or self.config.broker.host + port = port or self.config.broker.port or 5672 + user = user or "guest" + password = password or "guest" + client = qpid.client.Client(host, port) + try: + if client.spec.major == 8 and client.spec.minor == 0: + client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) + else: + client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) + except qpid.client.Closed, e: + if isinstance(e.args[0], VersionError): + raise Skipped(e.args[0]) + else: + raise e + except socket.error, e: + raise Skipped(e) + return client def queue_declare(self, channel=None, *args, **keys): channel = channel or self.channel @@ -319,17 +113,8 @@ class TestBase(unittest.TestCase): def consume(self, queueName): """Consume from named queue returns the Queue object.""" - if testrunner.use08spec() or testrunner.use09spec(): - reply = self.channel.basic_consume(queue=queueName, no_ack=True) - return self.client.queue(reply.consumer_tag) - else: - if not "uniqueTag" in dir(self): self.uniqueTag = 1 - else: self.uniqueTag += 1 - consumer_tag = "tag" + str(self.uniqueTag) - self.channel.message_subscribe(queue=queueName, destination=consumer_tag) - self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) - self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) - return self.client.queue(consumer_tag) + reply = self.channel.basic_consume(queue=queueName, no_ack=True) + return self.client.queue(reply.consumer_tag) def subscribe(self, channel=None, **keys): channel = channel or self.channel @@ -350,24 +135,14 @@ class TestBase(unittest.TestCase): Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() - if testrunner.use08spec() or testrunner.use09spec(): - self.channel.basic_publish( - exchange=exchange, - content=Content(body, properties=properties), - routing_key=routing_key) - else: - self.channel.message_transfer( - destination=exchange, - content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) + self.channel.basic_publish( + exchange=exchange, + content=Content(body, properties=properties), + routing_key=routing_key) msg = queue.get(timeout=1) - if testrunner.use08spec() or testrunner.use09spec(): - self.assertEqual(body, msg.content.body) - if (properties): - self.assertEqual(properties, msg.content.properties) - else: - self.assertEqual(body, msg.content.body) - if (properties): - self.assertEqual(properties, msg.content['application_headers']) + self.assertEqual(body, msg.content.body) + if (properties): + self.assertEqual(properties, msg.content.properties) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ @@ -394,11 +169,19 @@ class TestBase(unittest.TestCase): self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) +#0-10 support +from qpid.connection import Connection +from qpid.util import connect, ssl, URL + class TestBase010(unittest.TestCase): """ Base class for Qpid test cases. using the final 0-10 spec """ + def configure(self, config): + self.config = config + self.broker = config.broker + def setUp(self): self.conn = self.connect() self.session = self.conn.session("test-session", timeout=10) @@ -406,15 +189,26 @@ class TestBase010(unittest.TestCase): def startQmf(self, handler=None): self.qmf = qmf.console.Session(handler) - self.qmf_broker = self.qmf.addBroker(str(testrunner.url)) + self.qmf_broker = self.qmf.addBroker(str(self.broker)) def connect(self, host=None, port=None): - sock = connect(host or testrunner.host, port or testrunner.port) - if testrunner.url.scheme == URL.AMQPS: + url = self.broker + if url.scheme == URL.AMQPS: + default_port = 5671 + else: + default_port = 5672 + try: + sock = connect(host or url.host, port or url.port or default_port) + except socket.error, e: + raise Skipped(e) + if url.scheme == URL.AMQPS: sock = ssl(sock) - conn = Connection(sock, username=testrunner.user, - password=testrunner.password) - conn.start(timeout=10) + conn = Connection(sock, username=url.user or "guest", + password=url.password or "guest") + try: + conn.start(timeout=10) + except VersionError, e: + raise Skipped(e) return conn def tearDown(self): diff --git a/python/qpid/tests/framing.py b/python/qpid/tests/framing.py index 4cd596b583..0b33df8b9a 100644 --- a/python/qpid/tests/framing.py +++ b/python/qpid/tests/framing.py @@ -40,6 +40,50 @@ class Base(Test): assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2) assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2) + def cmp_list(self, l1, l2): + if l1 is None: + assert l2 is None + return + + assert len(l1) == len(l2) + for v1, v2 in zip(l1, l2): + if isinstance(v1, Compound): + self.cmp_ops(v1, v2) + else: + assert v1 == v2 + + def cmp_ops(self, op1, op2): + if op1 is None: + assert op2 is None + return + + assert op1.__class__ == op2.__class__ + cls = op1.__class__ + assert op1.NAME == op2.NAME + assert op1.CODE == op2.CODE + assert op1.FIELDS == op2.FIELDS + for f in cls.FIELDS: + v1 = getattr(op1, f.name) + v2 = getattr(op2, f.name) + if COMPOUND.has_key(f.type) or f.type == "struct32": + self.cmp_ops(v1, v2) + elif f.type in ("list", "array"): + self.cmp_list(v1, v2) + else: + assert v1 == v2, "expected: %r, got %r" % (v1, v2) + + if issubclass(cls, Command) or issubclass(cls, Control): + assert op1.channel == op2.channel + + if issubclass(cls, Command): + assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync) + assert (op1.headers is None and op2.headers is None) or \ + (op1.headers is not None and op2.headers is not None) + if op1.headers is not None: + assert len(op1.headers) == len(op2.headers) + for h1, h2 in zip(op1.headers, op2.headers): + self.cmp_ops(h1, h2) + class FrameTest(Base): def enc_dec(self, frames, encoded=None): @@ -171,3 +215,75 @@ class SegmentTest(Base): for i in range(0, 8, 2)] self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2) + +from qpid.ops import * + +class OpTest(Base): + + def enc_dec(self, ops): + enc = OpEncoder() + dec = OpDecoder() + enc.write(*ops) + segs = enc.read() + dec.write(*segs) + dops = dec.read() + assert len(ops) == len(dops) + for op1, op2 in zip(ops, dops): + self.cmp_ops(op1, op2) + + def testEmtpyMT(self): + self.enc_dec([MessageTransfer()]) + + def testEmptyMTSync(self): + self.enc_dec([MessageTransfer(sync=True)]) + + def testMT(self): + self.enc_dec([MessageTransfer(destination="asdf")]) + + def testSyncMT(self): + self.enc_dec([MessageTransfer(destination="asdf", sync=True)]) + + def testEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(payload="")]) + + def testPayloadMT(self): + self.enc_dec([MessageTransfer(payload="test payload")]) + + def testHeadersEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])]) + + def testHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")]) + + def testMultiHeadersEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])]) + + def testMultiHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")]) + + def testContentTypeHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")]) + + def testMulti(self): + self.enc_dec([MessageTransfer(), + MessageTransfer(sync=True), + MessageTransfer(destination="one"), + MessageTransfer(destination="two", sync=True), + MessageTransfer(destination="three", payload="test payload")]) + + def testControl(self): + self.enc_dec([SessionAttach(name="asdf")]) + + def testMixed(self): + self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")]) + + def testChannel(self): + self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)]) + + def testCompound(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])]) + + def testListCompound(self): + self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"), + Xid(global_id="two"), + Xid(global_id="three")]))]) diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 8a142d6c96..7706ebbabe 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -22,6 +22,7 @@ import time from qpid.tests import Test +from qpid.harness import Skipped from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty @@ -42,7 +43,10 @@ class Base(Test): def setup(self): self.test_id = uuid4() self.broker = self.config.broker - self.conn = self.setup_connection() + try: + self.conn = self.setup_connection() + except ConnectError, e: + raise Skipped(e) self.ssn = self.setup_session() self.snd = self.setup_sender() self.rcv = self.setup_receiver() @@ -65,7 +69,7 @@ class Base(Test): receiver = ssn.receiver("ping-queue") msg = receiver.fetch(0) ssn.acknowledge() - assert msg.content == content + assert msg.content == content, "expected %r, got %r" % (content, msg.content) def drain(self, rcv, limit=None): contents = [] |