summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-08-11 15:40:19 +0000
committerRafael H. Schloming <rhs@apache.org>2009-08-11 15:40:19 +0000
commite419dd6f187b5d135701f17f4b4382ece95068e5 (patch)
tree7c895aa61cbb9b926f3a0c6cecbc17e4e1989944
parentf92d8f621534d0ae704101f8a38110fedbff39af (diff)
downloadqpid-python-e419dd6f187b5d135701f17f4b4382ece95068e5.tar.gz
- removed old and redundent tests
- removed old test harness in favor of qpid-python-test - modified qpid-python-test to support "skipped" tests, these are tests that failed due to an anticipated environmental reason such as the broker is not running or it is the wrong version - modified the qpid-python-test harness to exit with appropriate error codes based on the test results - modified the python clients to report version mismatches rather than framing errors - made qpid_config provide variables for 0-8, 0-9, and 0-10 versions of the spec - modified the 0-10 client to directly codegen classes - added new 0-10 framing layer based on push parsing rather than pull parsing - added numerous framing tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803168 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/README.txt28
-rwxr-xr-xqpid/python/qpid-python-test76
-rw-r--r--qpid/python/qpid/assembler.py118
-rw-r--r--qpid/python/qpid/client.py7
-rw-r--r--qpid/python/qpid/codec010.py227
-rw-r--r--qpid/python/qpid/connection.py65
-rw-r--r--qpid/python/qpid/connection08.py15
-rw-r--r--qpid/python/qpid/datatypes.py6
-rw-r--r--qpid/python/qpid/delegates.py33
-rw-r--r--qpid/python/qpid/exceptions.py1
-rw-r--r--qpid/python/qpid/framer.py63
-rw-r--r--qpid/python/qpid/framing.py172
-rw-r--r--qpid/python/qpid/generator.py48
-rw-r--r--[-rwxr-xr-x]qpid/python/qpid/harness.py (renamed from qpid/python/run-tests)21
-rw-r--r--qpid/python/qpid/messaging.py29
-rw-r--r--qpid/python/qpid/ops.py277
-rw-r--r--qpid/python/qpid/peer.py4
-rw-r--r--qpid/python/qpid/session.py199
-rw-r--r--qpid/python/qpid/spec.py4
-rw-r--r--qpid/python/qpid/spec010.py708
-rw-r--r--qpid/python/qpid/testlib.py326
-rw-r--r--qpid/python/qpid/tests/framing.py116
-rw-r--r--qpid/python/qpid/tests/messaging.py8
-rw-r--r--qpid/python/qpid_config.py2
-rw-r--r--qpid/python/tests/__init__.py10
-rw-r--r--qpid/python/tests/assembler.py78
-rw-r--r--qpid/python/tests/codec.py14
-rw-r--r--qpid/python/tests/codec010.py14
-rw-r--r--qpid/python/tests/connection.py24
-rw-r--r--qpid/python/tests/datatypes.py12
-rw-r--r--qpid/python/tests/framer.py95
-rw-r--r--qpid/python/tests/spec.py74
-rw-r--r--qpid/python/tests/spec010.py70
-rw-r--r--qpid/python/tests_0-10/message.py4
-rw-r--r--qpid/python/tests_0-10/tx.py2
-rw-r--r--qpid/python/tests_0-8/__init__.py2
-rw-r--r--qpid/python/tests_0-8/basic.py7
-rw-r--r--qpid/python/tests_0-8/broker.py24
-rw-r--r--qpid/python/tests_0-8/example.py2
-rw-r--r--qpid/python/tests_0-8/queue.py2
-rw-r--r--qpid/python/tests_0-8/testlib.py2
-rw-r--r--qpid/python/tests_0-8/tx.py2
-rw-r--r--qpid/python/tests_0-9/__init__.py2
-rw-r--r--qpid/python/tests_0-9/basic.py396
-rw-r--r--qpid/python/tests_0-9/broker.py133
-rw-r--r--qpid/python/tests_0-9/dtx.py587
-rw-r--r--qpid/python/tests_0-9/example.py94
-rw-r--r--qpid/python/tests_0-9/exchange.py327
-rw-r--r--qpid/python/tests_0-9/execution.py29
-rw-r--r--qpid/python/tests_0-9/message.py657
-rw-r--r--qpid/python/tests_0-9/query.py2
-rw-r--r--qpid/python/tests_0-9/queue.py261
-rw-r--r--qpid/python/tests_0-9/testlib.py66
-rw-r--r--qpid/python/tests_0-9/tx.py188
54 files changed, 1159 insertions, 4574 deletions
diff --git a/qpid/python/README.txt b/qpid/python/README.txt
index bae9f6ab0b..772271cffe 100644
--- a/qpid/python/README.txt
+++ b/qpid/python/README.txt
@@ -27,30 +27,24 @@ python client. The "tests_0-10", "tests_0-9", and "tests_0-8"
directories contain protocol level conformance tests for AMQP brokers
of the specified version.
-Simplest way to run the tests:
+The qpid-python-test script may be used to run these tests. It will by
+default run the python unit tests and the 0-10 conformance tests:
1. Run a broker on the default port
- 2. ./run-tests -s <version>
+ 2. ./qpid-python-test
-Where <version> is one of "0-8", "0-9", or "0-10-errata".
+If you wish to run the 0-8 or 0-9 conformence tests, they may be
+selected as follows:
-See the run-tests usage for for additional options:
-
- ./run-tests -h
+ 1. Run a broker on the default port
-== Expected failures ==
+ 2. ./qpid-python-test tests_0-8.*
-Certain tests are expected to fail due to incomplete functionality or
-unresolved interop issues. To skip expected failures for the C++ or
-Java brokers:
+ -- or --
- ./run-tests -I <file-name>
+ ./qpid-python-test tests_0-9.*
-Where <file-name> is one of the following files:
+See the qpid-python-test usage for for additional options:
- * cpp_failing_0-10.txt
- * cpp_failing_0-9.txt
- * cpp_failing_0-8.txt
- * java_failing_0-9.txt
- * java_failing_0-8.txt
+ ./qpid-python-test -h
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test
index 3bf0e6ccce..b9e69c782e 100755
--- a/qpid/python/qpid-python-test
+++ b/qpid/python/qpid-python-test
@@ -25,6 +25,7 @@ from fnmatch import fnmatchcase as match
from getopt import GetoptError
from logging import getLogger, StreamHandler, Formatter, Filter, \
WARN, DEBUG, ERROR
+from qpid.harness import Skipped
from qpid.util import URL
levels = {
@@ -101,7 +102,7 @@ for a in args:
includes.append(a.strip())
if not includes:
- includes.append("*")
+ includes.extend(["qpid.tests.*", "tests.*", "tests_0-10.*"])
def is_ignored(path):
for p in excludes:
@@ -142,6 +143,7 @@ def vt100_attrs(*attrs):
vt100_reset = vt100_attrs(0)
KEYWORDS = {"pass": (32,),
+ "skip": (33,),
"fail": (31,),
"start": (34,),
"total": (34,),
@@ -165,9 +167,6 @@ def indent(text):
lines = text.split("\n")
return " %s" % "\n ".join(lines)
-from qpid.testlib import testrunner
-testrunner.setBroker(str(config.broker))
-
class Interceptor:
def __init__(self):
@@ -264,16 +263,27 @@ root.setLevel(WARN)
log = getLogger("qpid.test")
+PASS = "pass"
+SKIP = "skip"
+FAIL = "fail"
+
class Runner:
def __init__(self):
self.exceptions = []
+ self.skip = False
def passed(self):
return not self.exceptions
+ def skipped(self):
+ return self.skip
+
def failed(self):
- return self.exceptions
+ return self.exceptions and not self.skip
+
+ def halt(self):
+ return self.exceptions or self.skip
def run(self, name, phase):
try:
@@ -281,18 +291,28 @@ class Runner:
except KeyboardInterrupt:
raise
except:
- self.exceptions.append((name, sys.exc_info()))
+ exi = sys.exc_info()
+ if issubclass(exi[0], Skipped):
+ self.skip = True
+ self.exceptions.append((name, exi))
def status(self):
if self.passed():
- return "pass"
+ return PASS
+ elif self.skipped():
+ return SKIP
+ elif self.failed():
+ return FAIL
else:
- return "fail"
+ return None
def print_exceptions(self):
for name, info in self.exceptions:
- print "Error during %s:" % name
- print indent("".join(traceback.format_exception(*info))).rstrip()
+ if issubclass(info[0], Skipped):
+ print indent("".join(traceback.format_exception_only(*info[:2]))).rstrip()
+ else:
+ print "Error during %s:" % name
+ print indent("".join(traceback.format_exception(*info))).rstrip()
ST_WIDTH = 8
@@ -329,11 +349,11 @@ def run_test(name, test, config):
sys.stdout.write("\n")
sys.stdout.write(output)
print " %s" % colorize_word(runner.status())
- if runner.failed():
+ if runner.failed() or runner.skipped():
runner.print_exceptions()
root.setLevel(level)
filter.patterns = patterns
- return runner.passed()
+ return runner.status()
class FunctionTest:
@@ -373,13 +393,13 @@ class MethodTest:
if hasattr(inst, "configure"):
runner.run("configure", lambda: inst.configure(config))
- if runner.failed(): return runner
+ if runner.halt(): return runner
if hasattr(inst, "setUp"):
runner.run("setup", inst.setUp)
- if runner.failed(): return runner
+ if runner.halt(): return runner
elif hasattr(inst, "setup"):
runner.run("setup", inst.setup)
- if runner.failed(): return runner
+ if runner.halt(): return runner
runner.run("test", test)
@@ -473,7 +493,7 @@ class Harness:
objects.append(child)
self.scanned.append(obj)
-modules = "qpid.tests", "tests", "tests_0-10"
+modules = "qpid.tests", "tests", "tests_0-8", "tests_0-9", "tests_0-10"
h = Harness()
for name in modules:
m = __import__(name, None, None, ["dummy"])
@@ -485,13 +505,17 @@ total = len(filtered) + len(ignored)
passed = 0
failed = 0
+skipped = 0
for t in filtered:
if list_only:
print t.name()
else:
- if t.run():
+ st = t.run()
+ if st == PASS:
passed += 1
- else:
+ elif st == SKIP:
+ skipped += 1
+ elif st == FAIL:
failed += 1
if opts.hoe:
break
@@ -499,6 +523,10 @@ for t in filtered:
run = passed + failed
if not list_only:
+ if passed:
+ _pass = "pass"
+ else:
+ _pass = "fail"
if failed:
outcome = "fail"
else:
@@ -507,12 +535,22 @@ if not list_only:
ign = "ignored"
else:
ign = "pass"
+ if skipped:
+ skip = "skip"
+ else:
+ skip = "pass"
print colorize("Totals:", 1), \
colorize_word("total", "%s tests" % total) + ",", \
- colorize_word("pass", "%s passed" % passed) + ",", \
+ colorize_word(_pass, "%s passed" % passed) + ",", \
+ colorize_word(skip, "%s skipped" % skipped) + ",", \
colorize_word(ign, "%s ignored" % len(ignored)) + ",", \
colorize_word(outcome, "%s failed" % failed),
if opts.hoe and failed > 0:
print " -- (halted after %s)" % run
else:
print
+
+if failed or skipped:
+ sys.exit(1)
+else:
+ sys.exit(0)
diff --git a/qpid/python/qpid/assembler.py b/qpid/python/qpid/assembler.py
deleted file mode 100644
index 92bb0aa0f8..0000000000
--- a/qpid/python/qpid/assembler.py
+++ /dev/null
@@ -1,118 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from codec010 import StringCodec
-from framer import *
-from logging import getLogger
-
-log = getLogger("qpid.io.seg")
-
-class Segment:
-
- def __init__(self, first, last, type, track, channel, payload):
- self.id = None
- self.offset = None
- self.first = first
- self.last = last
- self.type = type
- self.track = track
- self.channel = channel
- self.payload = payload
-
- def decode(self, spec):
- segs = spec["segment_type"]
- choice = segs.choices[self.type]
- return getattr(self, "decode_%s" % choice.name)(spec)
-
- def decode_control(self, spec):
- sc = StringCodec(spec, self.payload)
- return sc.read_control()
-
- def decode_command(self, spec):
- sc = StringCodec(spec, self.payload)
- hdr, cmd = sc.read_command()
- cmd.id = self.id
- return hdr, cmd
-
- def decode_header(self, spec):
- sc = StringCodec(spec, self.payload)
- values = []
- while len(sc.encoded) > 0:
- values.append(sc.read_struct32())
- return values
-
- def decode_body(self, spec):
- return self.payload
-
- def __str__(self):
- return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
- self.track, self.channel, self.payload)
-
- def __repr__(self):
- return str(self)
-
-class Assembler(Framer):
-
- def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD):
- Framer.__init__(self, sock)
- self.max_payload = max_payload
- self.fragments = {}
-
- def read_segment(self):
- while True:
- frame = self.read_frame()
-
- key = (frame.channel, frame.track)
- seg = self.fragments.get(key)
- if seg == None:
- seg = Segment(frame.isFirstSegment(), frame.isLastSegment(),
- frame.type, frame.track, frame.channel, "")
- self.fragments[key] = seg
-
- seg.payload += frame.payload
-
- if frame.isLastFrame():
- self.fragments.pop(key)
- log.debug("RECV %s", seg)
- return seg
-
- def write_segment(self, segment):
- remaining = segment.payload
-
- first = True
- while first or remaining:
- payload = remaining[:self.max_payload]
- remaining = remaining[self.max_payload:]
-
- flags = 0
- if first:
- flags |= FIRST_FRM
- first = False
- if not remaining:
- flags |= LAST_FRM
- if segment.first:
- flags |= FIRST_SEG
- if segment.last:
- flags |= LAST_SEG
-
- frame = Frame(flags, segment.type, segment.track, segment.channel,
- payload)
- self.write_frame(frame)
-
- log.debug("SENT %s", segment)
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index 4605710de8..6107a4bc35 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -39,11 +39,8 @@ class Client:
if spec:
self.spec = spec
else:
- try:
- name = os.environ["AMQP_SPEC"]
- except KeyError:
- raise EnvironmentError("environment variable AMQP_SPEC must be set")
- self.spec = load(name)
+ from qpid_config import amqp_spec_0_9
+ self.spec = load(amqp_spec_0_9)
self.structs = StructFactory(self.spec)
self.sessions = {}
diff --git a/qpid/python/qpid/codec010.py b/qpid/python/qpid/codec010.py
index f07362c38d..682743df19 100644
--- a/qpid/python/qpid/codec010.py
+++ b/qpid/python/qpid/codec010.py
@@ -20,23 +20,66 @@
import datetime
from packer import Packer
from datatypes import serial, timestamp, RangedSet, Struct, UUID
+from ops import Compound, PRIMITIVE, COMPOUND
class CodecException(Exception): pass
+def direct(t):
+ return lambda x: t
+
+def map_str(s):
+ for c in s:
+ if ord(c) >= 0x80:
+ return "vbin16"
+ return "str16"
+
class Codec(Packer):
- def __init__(self, spec):
- self.spec = spec
+ ENCODINGS = {
+ unicode: direct("str16"),
+ str: map_str,
+ buffer: direct("vbin32"),
+ int: direct("int64"),
+ long: direct("int64"),
+ float: direct("double"),
+ None.__class__: direct("void"),
+ list: direct("list"),
+ tuple: direct("list"),
+ dict: direct("map"),
+ timestamp: direct("datetime"),
+ datetime.datetime: direct("datetime"),
+ UUID: direct("uuid"),
+ Compound: direct("struct32")
+ }
+
+ def encoding(self, obj):
+ enc = self._encoding(obj.__class__, obj)
+ if enc is None:
+ raise CodecException("no encoding for %r" % obj)
+ return PRIMITIVE[enc]
+
+ def _encoding(self, klass, obj):
+ if self.ENCODINGS.has_key(klass):
+ return self.ENCODINGS[klass](obj)
+ for base in klass.__bases__:
+ result = self._encoding(base, obj)
+ if result != None:
+ return result
+
+ def read_primitive(self, type):
+ return getattr(self, "read_%s" % type.NAME)()
+ def write_primitive(self, type, v):
+ getattr(self, "write_%s" % type.NAME)(v)
- def write_void(self, v):
- assert v == None
def read_void(self):
return None
+ def write_void(self, v):
+ assert v == None
- def write_bit(self, b):
- if not b: raise ValueError(b)
def read_bit(self):
return True
+ def write_bit(self, b):
+ if not b: raise ValueError(b)
def read_uint8(self):
return self.unpack("!B")
@@ -172,20 +215,8 @@ class Codec(Packer):
self.write_uint32(len(b))
self.write(b)
- def write_map(self, m):
- sc = StringCodec(self.spec)
- if m is not None:
- sc.write_uint32(len(m))
- for k, v in m.items():
- type = self.spec.encoding(v)
- if type == None:
- raise CodecException("no encoding for %s" % v.__class__)
- sc.write_str8(k)
- sc.write_uint8(type.code)
- type.encode(sc, v)
- self.write_vbin32(sc.encoded)
def read_map(self):
- sc = StringCodec(self.spec, self.read_vbin32())
+ sc = StringCodec(self.read_vbin32())
if not sc.encoded:
return None
count = sc.read_uint32()
@@ -193,91 +224,132 @@ class Codec(Packer):
while sc.encoded:
k = sc.read_str8()
code = sc.read_uint8()
- type = self.spec.types[code]
- v = type.decode(sc)
+ type = PRIMITIVE[code]
+ v = sc.read_primitive(type)
result[k] = v
return result
+ def write_map(self, m):
+ sc = StringCodec()
+ if m is not None:
+ sc.write_uint32(len(m))
+ for k, v in m.items():
+ type = self.encoding(v)
+ sc.write_str8(k)
+ sc.write_uint8(type.CODE)
+ sc.write_primitive(type, v)
+ self.write_vbin32(sc.encoded)
+ def read_array(self):
+ sc = StringCodec(self.read_vbin32())
+ if not sc.encoded:
+ return None
+ type = PRIMITIVE[sc.read_uint8()]
+ count = sc.read_uint32()
+ result = []
+ while count > 0:
+ result.append(sc.read_primitive(type))
+ count -= 1
+ return result
def write_array(self, a):
- sc = StringCodec(self.spec)
+ sc = StringCodec()
if a is not None:
if len(a) > 0:
- type = self.spec.encoding(a[0])
+ type = self.encoding(a[0])
else:
- type = self.spec.encoding(None)
- sc.write_uint8(type.code)
+ type = self.encoding(None)
+ sc.write_uint8(type.CODE)
sc.write_uint32(len(a))
for o in a:
- type.encode(sc, o)
+ sc.write_primitive(type, o)
self.write_vbin32(sc.encoded)
- def read_array(self):
- sc = StringCodec(self.spec, self.read_vbin32())
+
+ def read_list(self):
+ sc = StringCodec(self.read_vbin32())
if not sc.encoded:
return None
- type = self.spec.types[sc.read_uint8()]
count = sc.read_uint32()
result = []
while count > 0:
- result.append(type.decode(sc))
+ type = PRIMITIVE[sc.read_uint8()]
+ result.append(sc.read_primitive(type))
count -= 1
return result
-
def write_list(self, l):
- sc = StringCodec(self.spec)
+ sc = StringCodec()
if l is not None:
sc.write_uint32(len(l))
for o in l:
- type = self.spec.encoding(o)
- sc.write_uint8(type.code)
- type.encode(sc, o)
+ type = self.encoding(o)
+ sc.write_uint8(type.CODE)
+ sc.write_primitive(type, o)
self.write_vbin32(sc.encoded)
- def read_list(self):
- sc = StringCodec(self.spec, self.read_vbin32())
- if not sc.encoded:
- return None
- count = sc.read_uint32()
- result = []
- while count > 0:
- type = self.spec.types[sc.read_uint8()]
- result.append(type.decode(sc))
- count -= 1
- return result
def read_struct32(self):
size = self.read_uint32()
code = self.read_uint16()
- type = self.spec.structs[code]
- fields = type.decode_fields(self)
- return Struct(type, **fields)
+ cls = COMPOUND[code]
+ op = cls()
+ self.read_fields(op)
+ return op
def write_struct32(self, value):
- sc = StringCodec(self.spec)
- sc.write_uint16(value._type.code)
- value._type.encode_fields(sc, value)
- self.write_vbin32(sc.encoded)
-
- def read_control(self):
- cntrl = self.spec.controls[self.read_uint16()]
- return Struct(cntrl, **cntrl.decode_fields(self))
- def write_control(self, ctrl):
- type = ctrl._type
- self.write_uint16(type.code)
- type.encode_fields(self, ctrl)
-
- def read_command(self):
- type = self.spec.commands[self.read_uint16()]
- hdr = self.spec["session.header"].decode(self)
- cmd = Struct(type, **type.decode_fields(self))
- return hdr, cmd
- def write_command(self, hdr, cmd):
- self.write_uint16(cmd._type.code)
- hdr._type.encode(self, hdr)
- cmd._type.encode_fields(self, cmd)
+ self.write_compound(value)
+
+ def read_compound(self, cls):
+ size = self.read_size(cls.SIZE)
+ if cls.CODE is not None:
+ code = self.read_uint16()
+ assert code == cls.CODE
+ op = cls()
+ self.read_fields(op)
+ return op
+ def write_compound(self, op):
+ sc = StringCodec()
+ if op.CODE is not None:
+ sc.write_uint16(op.CODE)
+ sc.write_fields(op)
+ self.write_size(op.SIZE, len(sc.encoded))
+ self.write(sc.encoded)
+
+ def read_fields(self, op):
+ flags = 0
+ for i in range(op.PACK):
+ flags |= (self.read_uint8() << 8*i)
+
+ for i in range(len(op.FIELDS)):
+ f = op.FIELDS[i]
+ if flags & (0x1 << i):
+ if COMPOUND.has_key(f.type):
+ value = self.read_compound(COMPOUND[f.type])
+ else:
+ value = getattr(self, "read_%s" % f.type)()
+ setattr(op, f.name, value)
+ def write_fields(self, op):
+ flags = 0
+ for i in range(len(op.FIELDS)):
+ f = op.FIELDS[i]
+ value = getattr(op, f.name)
+ if f.type == "bit":
+ present = value
+ else:
+ present = value != None
+ if present:
+ flags |= (0x1 << i)
+ for i in range(op.PACK):
+ self.write_uint8((flags >> 8*i) & 0xFF)
+ for i in range(len(op.FIELDS)):
+ f = op.FIELDS[i]
+ if flags & (0x1 << i):
+ if COMPOUND.has_key(f.type):
+ enc = self.write_compound
+ else:
+ enc = getattr(self, "write_%s" % f.type)
+ value = getattr(op, f.name)
+ enc(value)
def read_size(self, width):
if width > 0:
attr = "read_uint%d" % (width*8)
return getattr(self, attr)()
-
def write_size(self, width, n):
if width > 0:
attr = "write_uint%d" % (width*8)
@@ -285,7 +357,6 @@ class Codec(Packer):
def read_uuid(self):
return UUID(self.unpack("16s"))
-
def write_uuid(self, s):
if isinstance(s, UUID):
s = s.bytes
@@ -293,7 +364,6 @@ class Codec(Packer):
def read_bin128(self):
return self.unpack("16s")
-
def write_bin128(self, b):
self.pack("16s", b)
@@ -301,14 +371,13 @@ class Codec(Packer):
class StringCodec(Codec):
- def __init__(self, spec, encoded = ""):
- Codec.__init__(self, spec)
+ def __init__(self, encoded = ""):
self.encoded = encoded
- def write(self, s):
- self.encoded += s
-
def read(self, n):
result = self.encoded[:n]
self.encoded = self.encoded[n:]
return result
+
+ def write(self, s):
+ self.encoded += s
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py
index 5abab3802c..680f8f62e3 100644
--- a/qpid/python/qpid/connection.py
+++ b/qpid/python/qpid/connection.py
@@ -20,14 +20,14 @@
import datatypes, session
from threading import Thread, Condition, RLock
from util import wait, notify
-from assembler import Assembler, Segment
from codec010 import StringCodec
+from framing import *
from session import Session
from generator import control_invoker
from spec import SPEC
from exceptions import *
from logging import getLogger
-import delegates
+import delegates, socket
class ChannelBusy(Exception): pass
@@ -43,12 +43,12 @@ def client(*args, **kwargs):
def server(*args, **kwargs):
return delegates.Server(*args, **kwargs)
-class Connection(Assembler):
+from framer import Framer
- def __init__(self, sock, spec=SPEC, delegate=client, **args):
- Assembler.__init__(self, sock)
- self.spec = spec
+class Connection(Framer):
+ def __init__(self, sock, delegate=client, **args):
+ Framer.__init__(self, sock)
self.lock = RLock()
self.attached = {}
self.sessions = {}
@@ -66,6 +66,10 @@ class Connection(Assembler):
self.channel_max = 65535
+ self.op_enc = OpEncoder()
+ self.seg_enc = SegmentEncoder()
+ self.frame_enc = FrameEncoder()
+
self.delegate = delegate(self, **args)
def attach(self, name, ch, delegate, force=False):
@@ -145,15 +149,44 @@ class Connection(Assembler):
raise ConnectionFailed(*self.close_code)
def run(self):
+ frame_dec = FrameDecoder()
+ seg_dec = SegmentDecoder()
+ op_dec = OpDecoder()
+
while not self.closed:
try:
- seg = self.read_segment()
- except Closed:
+ data = self.sock.recv(64*1024)
+ if not data:
+ self.detach_all()
+ break
+ except socket.timeout:
+ if self.aborted():
+ self.detach_all()
+ raise Closed("connection timed out")
+ else:
+ continue
+ except socket.error, e:
self.detach_all()
- break
- self.delegate.received(seg)
+ raise Closed(e)
+ frame_dec.write(data)
+ seg_dec.write(*frame_dec.read())
+ op_dec.write(*seg_dec.read())
+ for op in op_dec.read():
+ self.delegate.received(op)
self.sock.close()
+ def write_op(self, op):
+ self.sock_lock.acquire()
+ try:
+ self.op_enc.write(op)
+ self.seg_enc.write(*self.op_enc.read())
+ self.frame_enc.write(*self.seg_enc.read())
+ bytes = self.frame_enc.read()
+ self.write(bytes)
+ self.flush()
+ finally:
+ self.sock_lock.release()
+
def close(self, timeout=None):
if not self.opened: return
Channel(self, 0).connection_close(200)
@@ -169,19 +202,17 @@ class Connection(Assembler):
log = getLogger("qpid.io.ctl")
-class Channel(control_invoker(SPEC)):
+class Channel(control_invoker()):
def __init__(self, connection, id):
self.connection = connection
self.id = id
self.session = None
- def invoke(self, type, args, kwargs):
- ctl = type.new(args, kwargs)
- sc = StringCodec(self.spec)
- sc.write_control(ctl)
- self.connection.write_segment(Segment(True, True, type.segment_type,
- type.track, self.id, sc.encoded))
+ def invoke(self, op, args, kwargs):
+ ctl = op(*args, **kwargs)
+ ctl.channel = self.id
+ self.connection.write_op(ctl)
log.debug("SENT %s", ctl)
def __str__(self):
diff --git a/qpid/python/qpid/connection08.py b/qpid/python/qpid/connection08.py
index be94a792cb..d34cfe2847 100644
--- a/qpid/python/qpid/connection08.py
+++ b/qpid/python/qpid/connection08.py
@@ -28,6 +28,7 @@ from cStringIO import StringIO
from spec import load
from codec import EOF
from compat import SHUT_RDWR
+from exceptions import VersionError
class SockIO:
@@ -73,6 +74,9 @@ def listen(host, port, predicate = lambda: True):
s, a = sock.accept()
yield SockIO(s)
+class FramingError(Exception):
+ pass
+
class Connection:
def __init__(self, io, spec):
@@ -107,7 +111,16 @@ class Connection:
def read_8_0(self):
c = self.codec
- type = self.spec.constants.byid[c.decode_octet()].name
+ tid = c.decode_octet()
+ try:
+ type = self.spec.constants.byid[tid].name
+ except KeyError:
+ if tid == ord('A') and c.unpack("!3s") == "MQP":
+ _, _, major, minor = c.unpack("4B")
+ raise VersionError("client: %s-%s, server: %s-%s" %
+ (self.spec.major, self.spec.minor, major, minor))
+ else:
+ raise FramingError("unknown frame type: %s" % tid)
channel = c.decode_short()
body = c.decode_longstr()
dec = codec.Codec(StringIO(body), self.spec)
diff --git a/qpid/python/qpid/datatypes.py b/qpid/python/qpid/datatypes.py
index 4cd3fade2c..bba3f5b9ab 100644
--- a/qpid/python/qpid/datatypes.py
+++ b/qpid/python/qpid/datatypes.py
@@ -84,7 +84,7 @@ class Message:
def get(self, name):
if self.headers:
for h in self.headers:
- if h._type.name == name:
+ if h.NAME == name:
return h
return None
@@ -93,7 +93,7 @@ class Message:
self.headers = []
idx = 0
while idx < len(self.headers):
- if self.headers[idx]._type == header._type:
+ if self.headers[idx].NAME == header.NAME:
self.headers[idx] = header
return
idx += 1
@@ -102,7 +102,7 @@ class Message:
def clear(self, name):
idx = 0
while idx < len(self.headers):
- if self.headers[idx]._type.name == name:
+ if self.headers[idx].NAME == name:
del self.headers[idx]
return
idx += 1
diff --git a/qpid/python/qpid/delegates.py b/qpid/python/qpid/delegates.py
index 82bbe67ede..c74cc5a945 100644
--- a/qpid/python/qpid/delegates.py
+++ b/qpid/python/qpid/delegates.py
@@ -20,7 +20,9 @@
import os, connection, session
from util import notify
from datatypes import RangedSet
+from exceptions import VersionError
from logging import getLogger
+from ops import Control
import sys
log = getLogger("qpid.io.ctl")
@@ -29,26 +31,22 @@ class Delegate:
def __init__(self, connection, delegate=session.client):
self.connection = connection
- self.spec = connection.spec
self.delegate = delegate
- self.control = self.spec["track.control"].value
- def received(self, seg):
- ssn = self.connection.attached.get(seg.channel)
+ def received(self, op):
+ ssn = self.connection.attached.get(op.channel)
if ssn is None:
- ch = connection.Channel(self.connection, seg.channel)
+ ch = connection.Channel(self.connection, op.channel)
else:
ch = ssn.channel
- if seg.track == self.control:
- ctl = seg.decode(self.spec)
- log.debug("RECV %s", ctl)
- attr = ctl._type.qname.replace(".", "_")
- getattr(self, attr)(ch, ctl)
+ if isinstance(op, Control):
+ log.debug("RECV %s", op)
+ getattr(self, op.NAME)(ch, op)
elif ssn is None:
ch.session_detached()
else:
- ssn.received(seg)
+ ssn.received(op)
def connection_close(self, ch, close):
self.connection.close_code = (close.reply_code, close.reply_text)
@@ -124,7 +122,8 @@ class Server(Delegate):
def start(self):
self.connection.read_header()
- self.connection.write_header(self.spec.major, self.spec.minor)
+ # XXX
+ self.connection.write_header(0, 10)
connection.Channel(self.connection, 0).connection_start(mechanisms=["ANONYMOUS"])
def connection_start_ok(self, ch, start_ok):
@@ -156,8 +155,14 @@ class Client(Delegate):
self.heartbeat = heartbeat
def start(self):
- self.connection.write_header(self.spec.major, self.spec.minor)
- self.connection.read_header()
+ # XXX
+ cli_major = 0
+ cli_minor = 10
+ self.connection.write_header(cli_major, cli_minor)
+ magic, _, _, major, minor = self.connection.read_header()
+ if not (magic == "AMQP" and major == cli_major and minor == cli_minor):
+ raise VersionError("client: %s-%s, server: %s-%s" %
+ (cli_major, cli_minor, major, minor))
def connection_start(self, ch, start):
r = "\0%s\0%s" % (self.username, self.password)
diff --git a/qpid/python/qpid/exceptions.py b/qpid/python/qpid/exceptions.py
index 7eaaf81ed4..2bd80b7ffe 100644
--- a/qpid/python/qpid/exceptions.py
+++ b/qpid/python/qpid/exceptions.py
@@ -19,3 +19,4 @@
class Closed(Exception): pass
class Timeout(Exception): pass
+class VersionError(Exception): pass
diff --git a/qpid/python/qpid/framer.py b/qpid/python/qpid/framer.py
index 0d82e4378b..4cd0ae6f26 100644
--- a/qpid/python/qpid/framer.py
+++ b/qpid/python/qpid/framer.py
@@ -26,48 +26,6 @@ from logging import getLogger
raw = getLogger("qpid.io.raw")
frm = getLogger("qpid.io.frm")
-FIRST_SEG = 0x08
-LAST_SEG = 0x04
-FIRST_FRM = 0x02
-LAST_FRM = 0x01
-
-class Frame:
-
- HEADER = "!2BHxBH4x"
- HEADER_SIZE = struct.calcsize(HEADER)
- MAX_PAYLOAD = 65535 - struct.calcsize(HEADER)
-
- def __init__(self, flags, type, track, channel, payload):
- if len(payload) > Frame.MAX_PAYLOAD:
- raise ValueError("max payload size exceeded: %s" % len(payload))
- self.flags = flags
- self.type = type
- self.track = track
- self.channel = channel
- self.payload = payload
-
- def isFirstSegment(self):
- return bool(FIRST_SEG & self.flags)
-
- def isLastSegment(self):
- return bool(LAST_SEG & self.flags)
-
- def isFirstFrame(self):
- return bool(FIRST_FRM & self.flags)
-
- def isLastFrame(self):
- return bool(LAST_FRM & self.flags)
-
- def __repr__(self):
- return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()),
- int(self.isLastSegment()),
- int(self.isFirstFrame()),
- int(self.isLastFrame()),
- self.type,
- self.track,
- self.channel,
- self.payload)
-
class FramingError(Exception): pass
class Framer(Packer):
@@ -137,24 +95,3 @@ class Framer(Packer):
self.flush()
finally:
self.sock_lock.release()
-
- def write_frame(self, frame):
- self.sock_lock.acquire()
- try:
- size = len(frame.payload) + struct.calcsize(Frame.HEADER)
- track = frame.track & 0x0F
- self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
- self.write(frame.payload)
- if frame.isLastSegment() and frame.isLastFrame():
- self.flush()
- frm.debug("SENT %s", frame)
- finally:
- self.sock_lock.release()
-
- def read_frame(self):
- flags, type, size, track, channel = self.unpack(Frame.HEADER)
- if flags & 0xF0: raise FramingError()
- payload = self.read(size - struct.calcsize(Frame.HEADER))
- frame = Frame(flags, type, track, channel, payload)
- frm.debug("RECV %s", frame)
- return frame
diff --git a/qpid/python/qpid/framing.py b/qpid/python/qpid/framing.py
index 7c5f68fbcc..0a8f26272c 100644
--- a/qpid/python/qpid/framing.py
+++ b/qpid/python/qpid/framing.py
@@ -18,8 +18,64 @@
#
import struct
-from qpid.framer import Frame, FIRST_SEG, LAST_SEG, FIRST_FRM, LAST_FRM
-from qpid.assembler import Segment
+
+FIRST_SEG = 0x08
+LAST_SEG = 0x04
+FIRST_FRM = 0x02
+LAST_FRM = 0x01
+
+class Frame:
+
+ HEADER = "!2BHxBH4x"
+ HEADER_SIZE = struct.calcsize(HEADER)
+ MAX_PAYLOAD = 65535 - struct.calcsize(HEADER)
+
+ def __init__(self, flags, type, track, channel, payload):
+ if len(payload) > Frame.MAX_PAYLOAD:
+ raise ValueError("max payload size exceeded: %s" % len(payload))
+ self.flags = flags
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def isFirstSegment(self):
+ return bool(FIRST_SEG & self.flags)
+
+ def isLastSegment(self):
+ return bool(LAST_SEG & self.flags)
+
+ def isFirstFrame(self):
+ return bool(FIRST_FRM & self.flags)
+
+ def isLastFrame(self):
+ return bool(LAST_FRM & self.flags)
+
+ def __repr__(self):
+ return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()),
+ int(self.isLastSegment()),
+ int(self.isFirstFrame()),
+ int(self.isLastFrame()),
+ self.type,
+ self.track,
+ self.channel,
+ self.payload)
+
+class Segment:
+
+ def __init__(self, first, last, type, track, channel, payload):
+ self.id = None
+ self.offset = None
+ self.first = first
+ self.last = last
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def __repr__(self):
+ return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
+ self.track, self.channel, self.payload)
class FrameDecoder:
@@ -140,3 +196,115 @@ class SegmentEncoder:
result = self.frames
self.frames = []
return result
+
+from ops import COMMANDS, CONTROLS, COMPOUND, Header, segment_type, track
+from spec import SPEC
+
+from codec010 import StringCodec
+
+class OpEncoder:
+
+ def __init__(self):
+ self.segments = []
+
+ def write(self, *ops):
+ for op in ops:
+ if COMMANDS.has_key(op.NAME):
+ seg_type = segment_type.command
+ seg_track = track.command
+ enc = self.encode_command(op)
+ elif CONTROLS.has_key(op.NAME):
+ seg_type = segment_type.control
+ seg_track = track.control
+ enc = self.encode_compound(op)
+ else:
+ raise ValueError(op)
+ seg = Segment(True, False, seg_type, seg_track, op.channel, enc)
+ self.segments.append(seg)
+ if hasattr(op, "headers") and op.headers is not None:
+ hdrs = ""
+ for h in op.headers:
+ hdrs += self.encode_compound(h)
+ seg = Segment(False, False, segment_type.header, seg_track, op.channel,
+ hdrs)
+ self.segments.append(seg)
+ if hasattr(op, "payload") and op.payload is not None:
+ self.segments.append(Segment(False, False, segment_type.body, seg_track,
+ op.channel, op.payload))
+ self.segments[-1].last = True
+
+ def encode_command(self, cmd):
+ sc = StringCodec()
+ sc.write_uint16(cmd.CODE)
+ sc.write_compound(Header(sync=cmd.sync))
+ sc.write_fields(cmd)
+ return sc.encoded
+
+ def encode_compound(self, op):
+ sc = StringCodec()
+ sc.write_compound(op)
+ return sc.encoded
+
+ def read(self):
+ result = self.segments
+ self.segments = []
+ return result
+
+class OpDecoder:
+
+ def __init__(self):
+ self.op = None
+ self.ops = []
+
+ def write(self, *segments):
+ for seg in segments:
+ if seg.first:
+ if seg.type == segment_type.command:
+ self.op = self.decode_command(seg.payload)
+ elif seg.type == segment_type.control:
+ self.op = self.decode_control(seg.payload)
+ else:
+ raise ValueError(seg)
+ self.op.channel = seg.channel
+ elif seg.type == segment_type.header:
+ if self.op.headers is None:
+ self.op.headers = []
+ self.op.headers.extend(self.decode_headers(seg.payload))
+ elif seg.type == segment_type.body:
+ if self.op.payload is None:
+ self.op.payload = seg.payload
+ else:
+ self.op.payload += seg.payload
+ if seg.last:
+ self.ops.append(self.op)
+ self.op = None
+
+ def decode_command(self, encoded):
+ sc = StringCodec(encoded)
+ code = sc.read_uint16()
+ cls = COMMANDS[code]
+ hdr = sc.read_compound(Header)
+ cmd = cls()
+ sc.read_fields(cmd)
+ cmd.sync = hdr.sync
+ return cmd
+
+ def decode_control(self, encoded):
+ sc = StringCodec(encoded)
+ code = sc.read_uint16()
+ cls = CONTROLS[code]
+ ctl = cls()
+ sc.read_fields(ctl)
+ return ctl
+
+ def decode_headers(self, encoded):
+ sc = StringCodec(encoded)
+ result = []
+ while sc.encoded:
+ result.append(sc.read_struct32())
+ return result
+
+ def read(self):
+ result = self.ops
+ self.ops = []
+ return result
diff --git a/qpid/python/qpid/generator.py b/qpid/python/qpid/generator.py
index 729425d6a3..02d11e5005 100644
--- a/qpid/python/qpid/generator.py
+++ b/qpid/python/qpid/generator.py
@@ -19,42 +19,38 @@
import sys
-from spec010 import Control
+from ops import *
-def METHOD(module, inst):
- method = lambda self, *args, **kwargs: self.invoke(inst, args, kwargs)
+def METHOD(module, op):
+ method = lambda self, *args, **kwargs: self.invoke(op, args, kwargs)
if sys.version_info[:2] > (2, 3):
- method.__name__ = str(inst.pyname)
- method.__doc__ = str(inst.pydoc)
+ method.__name__ = op.__name__
+ method.__doc__ = op.__doc__
method.__module__ = module
return method
-def generate(spec, module, predicate=lambda x: True):
- dict = {"spec": spec}
+def generate(module, operations):
+ dict = {}
- for name, enum in spec.enums.items():
- dict[name] = enum
+ for name, enum in ENUMS.items():
+ if isinstance(name, basestring):
+ dict[name] = enum
- for name, st in spec.structs_by_name.items():
- dict[name] = METHOD(module, st)
+ for name, op in COMPOUND.items():
+ if isinstance(name, basestring):
+ dict[name] = METHOD(module, op)
- for st in spec.structs.values():
- dict[st.name] = METHOD(module, st)
-
- for name, inst in spec.instructions.items():
- if predicate(inst):
- dict[name] = METHOD(module, inst)
+ for name, op in operations.items():
+ if isinstance(name, basestring):
+ dict[name] = METHOD(module, op)
return dict
-def invoker(name, spec, predicate=lambda x: True):
- return type("%s_%s_%s" % (name, spec.major, spec.minor),
- (), generate(spec, invoker.__module__, predicate))
+def invoker(name, operations):
+ return type(name, (), generate(invoker.__module__, operations))
-def command_invoker(spec):
- is_command = lambda cmd: cmd.track == spec["track.command"].value
- return invoker("CommandInvoker", spec, is_command)
+def command_invoker():
+ return invoker("CommandInvoker", COMMANDS)
-def control_invoker(spec):
- is_control = lambda inst: isinstance(inst, Control)
- return invoker("ControlInvoker", spec, is_control)
+def control_invoker():
+ return invoker("ControlInvoker", CONTROLS)
diff --git a/qpid/python/run-tests b/qpid/python/qpid/harness.py
index 84b76ebfc1..ce48481612 100755..100644
--- a/qpid/python/run-tests
+++ b/qpid/python/qpid/harness.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -7,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,18 +17,4 @@
# under the License.
#
-import sys, logging
-from qpid.testlib import testrunner
-from qpid.log import enable, WARN, DEBUG
-
-if "-vv" in sys.argv:
- level = DEBUG
-else:
- level = WARN
-
-enable("qpid", level)
-
-if not testrunner.run(): sys.exit(1)
-
-
-
+class Skipped(Exception): pass
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py
index f06ef87709..9b3fecbf9b 100644
--- a/qpid/python/qpid/messaging.py
+++ b/qpid/python/qpid/messaging.py
@@ -34,8 +34,8 @@ import connection, time, socket, sys, traceback
from codec010 import StringCodec
from datatypes import timestamp, uuid4, RangedSet, Message as Message010
from logging import getLogger
+from ops import PRIMITIVE
from session import Client, INCOMPLETE
-from spec import SPEC
from threading import Thread, RLock, Condition
from util import connect
@@ -191,9 +191,12 @@ class Connection(Lockable):
try:
self._socket = connect(self.host, self.port)
except socket.error, e:
- raise ConnectError(*e.args)
+ raise ConnectError(e)
self._conn = connection.Connection(self._socket)
- self._conn.start()
+ try:
+ self._conn.start()
+ except connection.VersionError, e:
+ raise ConnectError(e)
for ssn in self.sessions.values():
ssn._attach()
@@ -263,8 +266,8 @@ FILTER_DEFAULTS = {
def delegate(session):
class Delegate(Client):
- def message_transfer(self, cmd, headers, body):
- session._message_transfer(cmd, headers, body)
+ def message_transfer(self, cmd):
+ session._message_transfer(cmd)
return Delegate
class Session(Lockable):
@@ -314,9 +317,9 @@ class Session(Lockable):
link._disconnected()
@synchronized
- def _message_transfer(self, cmd, headers, body):
- m = Message010(body)
- m.headers = headers
+ def _message_transfer(self, cmd):
+ m = Message010(cmd.payload)
+ m.headers = cmd.headers
m.id = cmd.id
msg = self._decode(m)
rcv = self.receivers[int(cmd.destination)]
@@ -812,16 +815,16 @@ class Receiver(Lockable):
def codec(name):
- type = SPEC.named[name]
+ type = PRIMITIVE[name]
def encode(x):
- sc = StringCodec(SPEC)
- type.encode(sc, x)
+ sc = StringCodec()
+ sc.write_primitive(type, x)
return sc.encoded
def decode(x):
- sc = StringCodec(SPEC, x)
- return type.decode(sc)
+ sc = StringCodec(x)
+ return sc.read_primitive(type)
return encode, decode
diff --git a/qpid/python/qpid/ops.py b/qpid/python/qpid/ops.py
new file mode 100644
index 0000000000..1f82889164
--- /dev/null
+++ b/qpid/python/qpid/ops.py
@@ -0,0 +1,277 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import os, mllib, cPickle as pickle
+from util import fill
+
+class Primitive(object):
+ pass
+
+class Enum(object):
+ pass
+
+class Field:
+
+ def __init__(self, name, type, default=None):
+ self.name = name
+ self.type = type
+ self.default = default
+
+ def __repr__(self):
+ return "%s: %s" % (self.name, self.type)
+
+class Compound(object):
+
+ UNENCODED=[]
+
+ def __init__(self, *args, **kwargs):
+ args = list(args)
+ for f in self.ARGS:
+ if args:
+ a = args.pop(0)
+ else:
+ a = kwargs.pop(f.name, f.default)
+ setattr(self, f.name, a)
+ if args:
+ raise TypeError("%s takes at most %s arguments (%s given))" %
+ (self.__class__.__name__, len(self.ARGS),
+ len(self.ARGS) + len(args)))
+ if kwargs:
+ raise TypeError("got unexpected keyword argument '%s'" % kwargs.keys()[0])
+
+ def fields(self):
+ result = {}
+ for f in self.FIELDS:
+ result[f.name] = getattr(self, f.name)
+ return result
+
+ def args(self):
+ result = {}
+ for f in self.ARGS:
+ result[f.name] = getattr(self, f.name)
+ return result
+
+ def dispatch(self, target, *args):
+ handler = "do_%s" % self.NAME
+ if hasattr(target, handler):
+ getattr(target, handler)(self, *args)
+ else:
+ print "UNHANDLED:", target, args
+
+ def __repr__(self, extras=()):
+ return "%s(%s)" % (self.__class__.__name__,
+ ", ".join(["%s=%r" % (f.name, getattr(self, f.name))
+ for f in self.ARGS
+ if getattr(self, f.name) is not f.default]))
+
+class Command(Compound):
+ UNENCODED=[Field("channel", "uint16", 0),
+ Field("id", "sequence-no", None),
+ Field("sync", "bit", False),
+ Field("headers", None, None),
+ Field("payload", None, None)]
+
+class Control(Compound):
+ UNENCODED=[Field("channel", "uint16", 0)]
+
+def pythonize(st):
+ if st is None:
+ return None
+ else:
+ return str(st.replace("-", "_"))
+
+def pydoc(op, children=()):
+ doc = "\n\n".join([fill(p.text(), 0) for p in op.query["doc"]])
+ for ch in children:
+ doc += "\n\n " + pythonize(ch["@name"]) + " -- " + str(ch["@label"])
+ ch_descs ="\n\n".join([fill(p.text(), 4) for p in ch.query["doc"]])
+ if ch_descs:
+ doc += "\n\n" + ch_descs
+ return doc
+
+def studly(st):
+ return "".join([p.capitalize() for p in st.split("-")])
+
+def klass(nd):
+ while nd.parent is not None:
+ if hasattr(nd.parent, "name") and nd.parent.name == "class":
+ return nd.parent
+ else:
+ nd = nd.parent
+
+def included(nd):
+ cls = klass(nd)
+ if cls is None:
+ return True
+ else:
+ return cls["@name"] not in ("file", "stream")
+
+def num(s):
+ if s: return int(s, 0)
+
+def code(nd):
+ c = num(nd["@code"])
+ if c is None:
+ return None
+ else:
+ cls = klass(nd)
+ if cls is None:
+ return c
+ else:
+ return c | (num(cls["@code"]) << 8)
+
+def default(f):
+ if f["@type"] == "bit":
+ return False
+ else:
+ return None
+
+def make_compound(decl, base):
+ dict = {}
+ fields = decl.query["field"]
+ dict["__doc__"] = pydoc(decl, fields)
+ dict["NAME"] = pythonize(decl["@name"])
+ dict["SIZE"] = num(decl["@size"])
+ dict["CODE"] = code(decl)
+ dict["PACK"] = num(decl["@pack"])
+ dict["FIELDS"] = [Field(pythonize(f["@name"]), resolve(f), default(f)) for f in fields]
+ dict["ARGS"] = dict["FIELDS"] + base.UNENCODED
+ return str(studly(decl["@name"])), (base,), dict
+
+def make_restricted(decl):
+ name = pythonize(decl["@name"])
+ dict = {}
+ choices = decl.query["choice"]
+ dict["__doc__"] = pydoc(decl, choices)
+ dict["NAME"] = name
+ dict["TYPE"] = str(decl.parent["@type"])
+ values = []
+ for ch in choices:
+ val = int(ch["@value"], 0)
+ dict[pythonize(ch["@name"])] = val
+ values.append(val)
+ dict["VALUES"] = values
+ return name, (Enum,), dict
+
+def make_type(decl):
+ name = pythonize(decl["@name"])
+ dict = {}
+ dict["__doc__"] = pydoc(decl)
+ dict["NAME"] = name
+ dict["CODE"] = code(decl)
+ return str(studly(decl["@name"])), (Primitive,), dict
+
+def make_command(decl):
+ decl.set_attr("name", "%s-%s" % (decl.parent["@name"], decl["@name"]))
+ decl.set_attr("size", "0")
+ decl.set_attr("pack", "2")
+ name, bases, dict = make_compound(decl, Command)
+ dict["RESULT"] = pythonize(decl["result/@type"]) or pythonize(decl["result/struct/@name"])
+ return name, bases, dict
+
+def make_control(decl):
+ decl.set_attr("name", "%s-%s" % (decl.parent["@name"], decl["@name"]))
+ decl.set_attr("size", "0")
+ decl.set_attr("pack", "2")
+ return make_compound(decl, Control)
+
+def make_struct(decl):
+ return make_compound(decl, Compound)
+
+def make_enum(decl):
+ decl.set_attr("name", decl.parent["@name"])
+ return make_restricted(decl)
+
+
+vars = globals()
+
+def make(nd):
+ return vars["make_%s" % nd.name](nd)
+
+from qpid_config import amqp_spec as file
+pclfile = "%s.ops.pcl" % file
+
+if False and (os.path.exists(pclfile) and
+ os.path.getmtime(pclfile) > os.path.getmtime(file)):
+ f = open(pclfile, "read")
+ types = pickle.load(f)
+ f.close()
+else:
+ spec = mllib.xml_parse(file)
+
+ def qualify(nd, field="@name"):
+ cls = klass(nd)
+ if cls is None:
+ return pythonize(nd[field])
+ else:
+ return pythonize("%s.%s" % (cls["@name"], nd[field]))
+
+ domains = dict([(qualify(d), pythonize(d["@type"]))
+ for d in spec.query["amqp/domain", included] + \
+ spec.query["amqp/class/domain", included]])
+
+ def resolve(nd):
+ candidates = qualify(nd, "@type"), pythonize(nd["@type"])
+ for c in candidates:
+ if domains.has_key(c):
+ while domains.has_key(c):
+ c = domains[c]
+ return c
+ else:
+ return c
+
+ type_decls = \
+ spec.query["amqp/class/command", included] + \
+ spec.query["amqp/class/control", included] + \
+ spec.query["amqp/class/command/result/struct", included] + \
+ spec.query["amqp/class/struct", included] + \
+ spec.query["amqp/class/domain/enum", included] + \
+ spec.query["amqp/domain/enum", included] + \
+ spec.query["amqp/type"]
+ types = [make(nd) for nd in type_decls]
+
+ if os.access(os.path.dirname(os.path.abspath(pclfile)), os.W_OK):
+ f = open(pclfile, "write")
+ pickle.dump(types, f)
+ f.close()
+
+ENUMS = {}
+PRIMITIVE = {}
+COMPOUND = {}
+COMMANDS = {}
+CONTROLS = {}
+
+for name, bases, dict in types:
+ t = type(name, bases, dict)
+ vars[name] = t
+
+ if issubclass(t, Command):
+ COMMANDS[t.NAME] = t
+ COMMANDS[t.CODE] = t
+ elif issubclass(t, Control):
+ CONTROLS[t.NAME] = t
+ CONTROLS[t.CODE] = t
+ elif issubclass(t, Compound):
+ COMPOUND[t.NAME] = t
+ if t.CODE is not None:
+ COMPOUND[t.CODE] = t
+ elif issubclass(t, Primitive):
+ PRIMITIVE[t.NAME] = t
+ PRIMITIVE[t.CODE] = t
+ elif issubclass(t, Enum):
+ ENUMS[t.NAME] = t
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 18d7848b8d..2bc9844351 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -25,7 +25,7 @@ incoming method frames to a delegate.
"""
import thread, threading, traceback, socket, sys, logging
-from connection08 import EOF, Method, Header, Body, Request, Response
+from connection08 import EOF, Method, Header, Body, Request, Response, VersionError
from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
@@ -95,6 +95,8 @@ class Peer:
break
ch = self.channel(frame.channel)
ch.receive(frame, self.work)
+ except VersionError, e:
+ self.closed(e)
except:
self.fatal()
diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py
index 3b8bd18469..4413a22899 100644
--- a/qpid/python/qpid/session.py
+++ b/qpid/python/qpid/session.py
@@ -22,9 +22,9 @@ from spec import SPEC
from generator import command_invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
-from assembler import Segment
from queue import Queue
from datatypes import Message, serial
+from ops import Command, MessageTransfer
from util import wait, notify
from exceptions import *
from logging import getLogger
@@ -44,7 +44,7 @@ def server(*args):
INCOMPLETE = object()
-class Session(command_invoker(SPEC)):
+class Session(command_invoker()):
def __init__(self, name, auto_sync=True, timeout=10, delegate=client):
self.name = name
@@ -67,8 +67,6 @@ class Session(command_invoker(SPEC)):
self.results = {}
self.exceptions = []
- self.assembly = None
-
self.delegate = delegate(self)
def incoming(self, destination):
@@ -134,68 +132,51 @@ class Session(command_invoker(SPEC)):
finally:
self.lock.release()
- def invoke(self, type, args, kwargs):
- # XXX
- if not hasattr(type, "track"):
- return type.new(args, kwargs)
-
- self.invoke_lock.acquire()
- try:
- return self.do_invoke(type, args, kwargs)
- finally:
- self.invoke_lock.release()
+ def invoke(self, op, args, kwargs):
+ if issubclass(op, Command):
+ self.invoke_lock.acquire()
+ try:
+ return self.do_invoke(op, args, kwargs)
+ finally:
+ self.invoke_lock.release()
+ else:
+ return op(*args, **kwargs)
- def do_invoke(self, type, args, kwargs):
+ def do_invoke(self, op, args, kwargs):
if self._closing:
raise SessionClosed()
if self.channel == None:
raise SessionDetached()
- if type.segments:
- if len(args) == len(type.fields) + 1:
+ if op == MessageTransfer:
+ if len(args) == len(op.FIELDS) + 1:
message = args[-1]
args = args[:-1]
else:
message = kwargs.pop("message", None)
- else:
- message = None
-
- hdr = Struct(self.spec["session.header"])
- hdr.sync = self.auto_sync or kwargs.pop("sync", False)
- self.need_sync = not hdr.sync
+ if message is not None:
+ kwargs["headers"] = message.headers
+ kwargs["payload"] = message.body
- cmd = type.new(args, kwargs)
- sc = StringCodec(self.spec)
- sc.write_command(hdr, cmd)
+ cmd = op(*args, **kwargs)
+ cmd.sync = self.auto_sync or cmd.sync
+ self.need_sync = not cmd.sync
+ cmd.channel = self.channel.id
- seg = Segment(True, (message == None or
- (message.headers == None and message.body == None)),
- type.segment_type, type.track, self.channel.id, sc.encoded)
-
- if type.result:
+ if op.RESULT:
result = Future(exception=SessionException)
self.results[self.sender.next_id] = result
- self.send(seg)
-
- log.debug("SENT %s %s %s", seg.id, hdr, cmd)
-
- if message != None:
- if message.headers != None:
- sc = StringCodec(self.spec)
- for st in message.headers:
- sc.write_struct32(st)
- seg = Segment(False, message.body == None, self.spec["segment_type.header"].value,
- type.track, self.channel.id, sc.encoded)
- self.send(seg)
- if message.body != None:
- seg = Segment(False, True, self.spec["segment_type.body"].value,
- type.track, self.channel.id, message.body)
- self.send(seg)
- msg.debug("SENT %s", message)
-
- if type.result:
+ log.debug("SENDING %s", cmd)
+
+ self.send(cmd)
+
+ log.debug("SENT %s", cmd)
+ if op == MessageTransfer:
+ msg.debug("SENT %s", cmd)
+
+ if op.RESULT:
if self.auto_sync:
return result.get(self.timeout)
else:
@@ -203,81 +184,47 @@ class Session(command_invoker(SPEC)):
elif self.auto_sync:
self.sync(self.timeout)
- def received(self, seg):
- self.receiver.received(seg)
- if seg.first:
- assert self.assembly == None
- self.assembly = []
- self.assembly.append(seg)
- if seg.last:
- self.dispatch(self.assembly)
- self.assembly = None
-
- def dispatch(self, assembly):
- segments = assembly[:]
-
- hdr, cmd = assembly.pop(0).decode(self.spec)
- log.debug("RECV %s %s %s", cmd.id, hdr, cmd)
-
- args = []
-
- for st in cmd._type.segments:
- if assembly:
- seg = assembly[0]
- if seg.type == st.segment_type:
- args.append(seg.decode(self.spec))
- assembly.pop(0)
- continue
- args.append(None)
-
- assert len(assembly) == 0
+ def received(self, cmd):
+ self.receiver.received(cmd)
+ self.dispatch(cmd)
- attr = cmd._type.qname.replace(".", "_")
- result = getattr(self.delegate, attr)(cmd, *args)
+ def dispatch(self, cmd):
+ log.debug("RECV %s", cmd)
- if cmd._type.result:
+ result = getattr(self.delegate, cmd.NAME)(cmd)
+ if result is INCOMPLETE:
+ return
+ elif result is not None:
self.execution_result(cmd.id, result)
- if result is not INCOMPLETE:
- for seg in segments:
- self.receiver.completed(seg)
- # XXX: don't forget to obey sync for manual completion as well
- if hdr.sync:
- self.channel.session_completed(self.receiver._completed)
+ self.receiver.completed(cmd)
+ # XXX: don't forget to obey sync for manual completion as well
+ if cmd.sync:
+ self.channel.session_completed(self.receiver._completed)
- def send(self, seg):
- self.sender.send(seg)
-
- def __str__(self):
- return '<Session: %s, %s>' % (self.name, self.channel)
+ def send(self, cmd):
+ self.sender.send(cmd)
def __repr__(self):
- return str(self)
+ return '<Session: %s, %s>' % (self.name, self.channel)
class Receiver:
def __init__(self, session):
self.session = session
self.next_id = None
- self.next_offset = None
self._completed = RangedSet()
- def received(self, seg):
- if self.next_id == None or self.next_offset == None:
+ def received(self, cmd):
+ if self.next_id == None:
raise Exception("todo")
- seg.id = self.next_id
- seg.offset = self.next_offset
- if seg.last:
- self.next_id += 1
- self.next_offset = 0
- else:
- self.next_offset += len(seg.payload)
+ cmd.id = self.next_id
+ self.next_id += 1
- def completed(self, seg):
- if seg.id == None:
- raise ValueError("cannot complete unidentified segment")
- if seg.last:
- self._completed.add(seg.id)
+ def completed(self, cmd):
+ if cmd.id == None:
+ raise ValueError("cannot complete unidentified command")
+ self._completed.add(cmd.id)
def known_completed(self, commands):
completed = RangedSet()
@@ -294,30 +241,24 @@ class Sender:
def __init__(self, session):
self.session = session
self.next_id = serial(0)
- self.next_offset = 0
- self.segments = []
+ self.commands = []
self._completed = RangedSet()
- def send(self, seg):
- seg.id = self.next_id
- seg.offset = self.next_offset
- if seg.last:
- self.next_id += 1
- self.next_offset = 0
- else:
- self.next_offset += len(seg.payload)
- self.segments.append(seg)
+ def send(self, cmd):
+ cmd.id = self.next_id
+ self.next_id += 1
if self.session.send_id:
self.session.send_id = False
- self.session.channel.session_command_point(seg.id, seg.offset)
- self.session.channel.connection.write_segment(seg)
+ self.session.channel.session_command_point(cmd.id, 0)
+ self.commands.append(cmd)
+ self.session.channel.connection.write_op(cmd)
def completed(self, commands):
idx = 0
- while idx < len(self.segments):
- seg = self.segments[idx]
- if seg.id in commands:
- del self.segments[idx]
+ while idx < len(self.commands):
+ cmd = self.commands[idx]
+ if cmd.id in commands:
+ del self.commands[idx]
else:
idx += 1
for range in commands.ranges:
@@ -332,7 +273,7 @@ class Incoming(Queue):
def start(self):
self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit)
- for unit in self.session.credit_unit.values():
+ for unit in self.session.credit_unit.VALUES:
self.session.message_flow(self.destination, unit, 0xFFFFFFFFL)
def stop(self):
@@ -356,9 +297,9 @@ class Delegate:
class Client(Delegate):
- def message_transfer(self, cmd, headers, body):
- m = Message(body)
- m.headers = headers
+ def message_transfer(self, cmd):
+ m = Message(cmd.payload)
+ m.headers = cmd.headers
m.id = cmd.id
messages = self.session.incoming(cmd.destination)
messages.put(m)
diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py
index cd76c70c5c..e9bfef1fa6 100644
--- a/qpid/python/qpid/spec.py
+++ b/qpid/python/qpid/spec.py
@@ -29,7 +29,7 @@ class so that the generated code can be reused in a variety of
situations.
"""
-import os, mllib, spec08, spec010
+import os, mllib, spec08
def default():
try:
@@ -54,7 +54,7 @@ def load(specfile, *errata):
minor = doc["amqp/@minor"]
if major == "0" and minor == "10":
- return spec010.load(specfile, *errata)
+ return None
else:
return spec08.load(specfile, *errata)
diff --git a/qpid/python/qpid/spec010.py b/qpid/python/qpid/spec010.py
deleted file mode 100644
index eabc8e2983..0000000000
--- a/qpid/python/qpid/spec010.py
+++ /dev/null
@@ -1,708 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, cPickle, datatypes, datetime
-from codec010 import StringCodec
-from util import mtime, fill
-
-class Node:
-
- def __init__(self, children):
- self.children = children
- self.named = {}
- self.docs = []
- self.rules = []
-
- def register(self):
- for ch in self.children:
- ch.register(self)
-
- def resolve(self):
- for ch in self.children:
- ch.resolve()
-
- def __getitem__(self, name):
- path = name.split(".", 1)
- nd = self.named
- for step in path:
- nd = nd[step]
- return nd
-
- def __iter__(self):
- return iter(self.children)
-
-class Anonymous:
-
- def __init__(self, children):
- self.children = children
-
- def register(self, node):
- for ch in self.children:
- ch.register(node)
-
- def resolve(self):
- for ch in self.children:
- ch.resolve()
-
-class Named:
-
- def __init__(self, name):
- self.name = name
- self.qname = None
-
- def register(self, node):
- self.spec = node.spec
- self.klass = node.klass
- node.named[self.name] = self
- if node.qname:
- self.qname = "%s.%s" % (node.qname, self.name)
- else:
- self.qname = self.name
-
- def __str__(self):
- return self.qname
-
- def __repr__(self):
- return str(self)
-
-class Lookup:
-
- def lookup(self, name):
- value = None
- if self.klass:
- try:
- value = self.klass[name]
- except KeyError:
- pass
- if not value:
- value = self.spec[name]
- return value
-
-class Coded:
-
- def __init__(self, code):
- self.code = code
-
-class Constant(Named, Node):
-
- def __init__(self, name, value, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
- self.value = value
-
- def register(self, node):
- Named.register(self, node)
- node.constants.append(self)
- Node.register(self)
-
-class Type(Named, Node):
-
- def __init__(self, name, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
-
- def is_present(self, value):
- return value != None
-
- def register(self, node):
- Named.register(self, node)
- Node.register(self)
-
-class Primitive(Coded, Type):
-
- def __init__(self, name, code, fixed, variable, children):
- Coded.__init__(self, code)
- Type.__init__(self, name, children)
- self.fixed = fixed
- self.variable = variable
-
- def register(self, node):
- Type.register(self, node)
- if self.code is not None:
- self.spec.types[self.code] = self
-
- def is_present(self, value):
- if self.fixed == 0:
- return value
- else:
- return Type.is_present(self, value)
-
- def encode(self, codec, value):
- getattr(codec, "write_%s" % self.name)(value)
-
- def decode(self, codec):
- return getattr(codec, "read_%s" % self.name)()
-
-class Domain(Type, Lookup):
-
- def __init__(self, name, type, children):
- Type.__init__(self, name, children)
- self.type = type
- self.choices = {}
-
- def resolve(self):
- self.type = self.lookup(self.type)
- Node.resolve(self)
-
- def encode(self, codec, value):
- self.type.encode(codec, value)
-
- def decode(self, codec):
- return self.type.decode(codec)
-
-class Enum:
-
- def __init__(self, name):
- self.name = name
- self._names = ()
- self._values = ()
-
- def values(self):
- return self._values
-
- def __repr__(self):
- return "%s(%s)" % (self.name, ", ".join(self._names))
-
-class Choice(Named, Node):
-
- def __init__(self, name, value, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
- self.value = value
-
- def register(self, node):
- Named.register(self, node)
- node.choices[self.value] = self
- Node.register(self)
- try:
- enum = node.spec.enums[node.name]
- except KeyError:
- enum = Enum(node.name)
- node.spec.enums[node.name] = enum
- setattr(enum, self.name, self.value)
- enum._names += (self.name,)
- enum._values += (self.value,)
-
-class Composite(Type, Coded):
-
- def __init__(self, name, label, code, size, pack, children):
- Coded.__init__(self, code)
- Type.__init__(self, name, children)
- self.label = label
- self.fields = []
- self.size = size
- self.pack = pack
-
- def new(self, args, kwargs):
- return datatypes.Struct(self, *args, **kwargs)
-
- def decode(self, codec):
- codec.read_size(self.size)
- if self.code is not None:
- code = codec.read_uint16()
- assert self.code == code
- return datatypes.Struct(self, **self.decode_fields(codec))
-
- def decode_fields(self, codec):
- flags = 0
- for i in range(self.pack):
- flags |= (codec.read_uint8() << 8*i)
-
- result = {}
-
- for i in range(len(self.fields)):
- f = self.fields[i]
- if flags & (0x1 << i):
- result[f.name] = f.type.decode(codec)
- else:
- result[f.name] = None
- return result
-
- def encode(self, codec, value):
- sc = StringCodec(self.spec)
- if self.code is not None:
- sc.write_uint16(self.code)
- self.encode_fields(sc, value)
- codec.write_size(self.size, len(sc.encoded))
- codec.write(sc.encoded)
-
- def encode_fields(self, codec, values):
- flags = 0
- for i in range(len(self.fields)):
- f = self.fields[i]
- if f.type.is_present(values[f.name]):
- flags |= (0x1 << i)
- for i in range(self.pack):
- codec.write_uint8((flags >> 8*i) & 0xFF)
- for i in range(len(self.fields)):
- f = self.fields[i]
- if flags & (0x1 << i):
- f.type.encode(codec, values[f.name])
-
- def docstring(self):
- docs = []
- if self.label:
- docs.append(self.label)
- docs += [d.text for d in self.docs]
- s = "\n\n".join([fill(t, 2) for t in docs])
- for f in self.fields:
- fdocs = []
- if f.label:
- fdocs.append(f.label)
- else:
- fdocs.append("")
- fdocs += [d.text for d in f.docs]
- s += "\n\n" + "\n\n".join([fill(fdocs[0], 4, f.name)] +
- [fill(t, 4) for t in fdocs[1:]])
- return s
-
-
-class Field(Named, Node, Lookup):
-
- def __init__(self, name, label, type, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
- self.label = label
- self.type = type
- self.exceptions = []
-
- def default(self):
- return None
-
- def register(self, node):
- Named.register(self, node)
- node.fields.append(self)
- Node.register(self)
-
- def resolve(self):
- self.type = self.lookup(self.type)
- Node.resolve(self)
-
- def __str__(self):
- return "%s: %s" % (self.qname, self.type.qname)
-
-class Struct(Composite):
-
- def register(self, node):
- Composite.register(self, node)
- if self.code is not None:
- self.spec.structs[self.code] = self
- self.spec.structs_by_name[self.name] = self
- self.pyname = self.name
- self.pydoc = self.docstring()
-
- def __str__(self):
- fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname)
- for f in self.fields])
- return "%s {\n %s\n}" % (self.qname, fields)
-
-class Segment:
-
- def __init__(self):
- self.segment_type = None
-
- def register(self, node):
- self.spec = node.spec
- self.klass = node.klass
- node.segments.append(self)
- Node.register(self)
-
-class Instruction(Composite, Segment):
-
- def __init__(self, name, label, code, children):
- Composite.__init__(self, name, label, code, 0, 2, children)
- Segment.__init__(self)
- self.track = None
- self.handlers = []
-
- def __str__(self):
- return "%s(%s)" % (self.qname, ", ".join(["%s: %s" % (f.name, f.type.qname)
- for f in self.fields]))
-
- def register(self, node):
- Composite.register(self, node)
- self.pyname = self.qname.replace(".", "_")
- self.pydoc = self.docstring()
- self.spec.instructions[self.pyname] = self
-
-class Control(Instruction):
-
- def __init__(self, name, code, label, children):
- Instruction.__init__(self, name, code, label, children)
- self.response = None
-
- def register(self, node):
- Instruction.register(self, node)
- node.controls.append(self)
- self.spec.controls[self.code] = self
- self.segment_type = self.spec["segment_type.control"].value
- self.track = self.spec["track.control"].value
-
-class Command(Instruction):
-
- def __init__(self, name, label, code, children):
- Instruction.__init__(self, name, label, code, children)
- self.result = None
- self.exceptions = []
- self.segments = []
-
- def register(self, node):
- Instruction.register(self, node)
- node.commands.append(self)
- self.spec.commands[self.code] = self
- self.segment_type = self.spec["segment_type.command"].value
- self.track = self.spec["track.command"].value
-
-class Header(Segment, Node):
-
- def __init__(self, children):
- Segment.__init__(self)
- Node.__init__(self, children)
- self.entries = []
-
- def register(self, node):
- Segment.register(self, node)
- self.segment_type = self.spec["segment_type.header"].value
- Node.register(self)
-
-class Entry(Lookup):
-
- def __init__(self, type):
- self.type = type
-
- def register(self, node):
- self.spec = node.spec
- self.klass = node.klass
- node.entries.append(self)
-
- def resolve(self):
- self.type = self.lookup(self.type)
-
-class Body(Segment, Node):
-
- def __init__(self, children):
- Segment.__init__(self)
- Node.__init__(self, children)
-
- def register(self, node):
- Segment.register(self, node)
- self.segment_type = self.spec["segment_type.body"].value
- Node.register(self)
-
- def resolve(self): pass
-
-class Class(Named, Coded, Node):
-
- def __init__(self, name, code, children):
- Named.__init__(self, name)
- Coded.__init__(self, code)
- Node.__init__(self, children)
- self.controls = []
- self.commands = []
-
- def register(self, node):
- Named.register(self, node)
- self.klass = self
- node.classes.append(self)
- Node.register(self)
-
-class Doc:
-
- def __init__(self, type, title, text):
- self.type = type
- self.title = title
- self.text = text
-
- def register(self, node):
- node.docs.append(self)
-
- def resolve(self): pass
-
-class Role(Named, Node):
-
- def __init__(self, name, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
-
- def register(self, node):
- Named.register(self, node)
- Node.register(self)
-
-class Rule(Named, Node):
-
- def __init__(self, name, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
-
- def register(self, node):
- Named.register(self, node)
- node.rules.append(self)
- Node.register(self)
-
-class Exception(Named, Node):
-
- def __init__(self, name, error_code, children):
- Named.__init__(self, name)
- Node.__init__(self, children)
- self.error_code = error_code
-
- def register(self, node):
- Named.register(self, node)
- node.exceptions.append(self)
- Node.register(self)
-
-def direct(t):
- return lambda x: t
-
-def map_str(s):
- for c in s:
- if ord(c) >= 0x80:
- return "vbin16"
- return "str16"
-
-class Spec(Node):
-
- ENCODINGS = {
- unicode: direct("str16"),
- str: map_str,
- buffer: direct("vbin32"),
- int: direct("int64"),
- long: direct("int64"),
- float: direct("double"),
- None.__class__: direct("void"),
- list: direct("list"),
- tuple: direct("list"),
- dict: direct("map"),
- datatypes.timestamp: direct("datetime"),
- datetime.datetime: direct("datetime"),
- datatypes.UUID: direct("uuid")
- }
-
- def __init__(self, major, minor, port, children):
- Node.__init__(self, children)
- self.major = major
- self.minor = minor
- self.port = port
- self.constants = []
- self.classes = []
- self.types = {}
- self.qname = None
- self.spec = self
- self.klass = None
- self.instructions = {}
- self.controls = {}
- self.commands = {}
- self.structs = {}
- self.structs_by_name = {}
- self.enums = {}
-
- def encoding(self, obj):
- return self._encoding(obj.__class__, obj)
-
- def _encoding(self, klass, obj):
- if Spec.ENCODINGS.has_key(klass):
- return self.named[Spec.ENCODINGS[klass](obj)]
- for base in klass.__bases__:
- result = self._encoding(base, obj)
- if result != None:
- return result
-
-class Implement:
-
- def __init__(self, handle):
- self.handle = handle
-
- def register(self, node):
- node.handlers.append(self.handle)
-
- def resolve(self): pass
-
-class Response(Node):
-
- def __init__(self, name, children):
- Node.__init__(self, children)
- self.name = name
-
- def register(self, node):
- Node.register(self)
-
-class Result(Node, Lookup):
-
- def __init__(self, type, children):
- self.type = type
- Node.__init__(self, children)
-
- def register(self, node):
- node.result = self
- self.qname = node.qname
- self.klass = node.klass
- self.spec = node.spec
- Node.register(self)
-
- def resolve(self):
- self.type = self.lookup(self.type)
- Node.resolve(self)
-
-import mllib
-
-def num(s):
- if s: return int(s, 0)
-
-REPLACE = {" ": "_", "-": "_"}
-KEYWORDS = {"global": "global_",
- "return": "return_"}
-
-def id(name):
- name = str(name)
- for key, val in REPLACE.items():
- name = name.replace(key, val)
- try:
- name = KEYWORDS[name]
- except KeyError:
- pass
- return name
-
-class Loader:
-
- def __init__(self):
- self.class_code = 0
-
- def code(self, nd):
- c = num(nd["@code"])
- if c is None:
- return None
- else:
- return c | (self.class_code << 8)
-
- def list(self, q):
- result = []
- for nd in q:
- result.append(nd.dispatch(self))
- return result
-
- def children(self, n):
- return self.list(n.query["#tag"])
-
- def data(self, d):
- return d.data
-
- def do_amqp(self, a):
- return Spec(num(a["@major"]), num(a["@minor"]), num(a["@port"]),
- self.children(a))
-
- def do_type(self, t):
- return Primitive(id(t["@name"]), self.code(t), num(t["@fixed-width"]),
- num(t["@variable-width"]), self.children(t))
-
- def do_constant(self, c):
- return Constant(id(c["@name"]), num(c["@value"]), self.children(c))
-
- def do_domain(self, d):
- return Domain(id(d["@name"]), id(d["@type"]), self.children(d))
-
- def do_enum(self, e):
- return Anonymous(self.children(e))
-
- def do_choice(self, c):
- return Choice(id(c["@name"]), num(c["@value"]), self.children(c))
-
- def do_class(self, c):
- code = num(c["@code"])
- self.class_code = code
- children = self.children(c)
- children += self.list(c.query["command/result/struct"])
- self.class_code = 0
- return Class(id(c["@name"]), code, children)
-
- def do_doc(self, doc):
- text = reduce(lambda x, y: x + y, self.list(doc.children))
- return Doc(doc["@type"], doc["@title"], text)
-
- def do_xref(self, x):
- return x["@ref"]
-
- def do_role(self, r):
- return Role(id(r["@name"]), self.children(r))
-
- def do_control(self, c):
- return Control(id(c["@name"]), c["@label"], self.code(c), self.children(c))
-
- def do_rule(self, r):
- return Rule(id(r["@name"]), self.children(r))
-
- def do_implement(self, i):
- return Implement(id(i["@handle"]))
-
- def do_response(self, r):
- return Response(id(r["@name"]), self.children(r))
-
- def do_field(self, f):
- return Field(id(f["@name"]), f["@label"], id(f["@type"]), self.children(f))
-
- def do_struct(self, s):
- return Struct(id(s["@name"]), s["@label"], self.code(s), num(s["@size"]),
- num(s["@pack"]), self.children(s))
-
- def do_command(self, c):
- return Command(id(c["@name"]), c["@label"], self.code(c), self.children(c))
-
- def do_segments(self, s):
- return Anonymous(self.children(s))
-
- def do_header(self, h):
- return Header(self.children(h))
-
- def do_entry(self, e):
- return Entry(id(e["@type"]))
-
- def do_body(self, b):
- return Body(self.children(b))
-
- def do_result(self, r):
- type = r["@type"]
- if not type:
- type = r["struct/@name"]
- return Result(id(type), self.list(r.query["#tag", lambda x: x.name != "struct"]))
-
- def do_exception(self, e):
- return Exception(id(e["@name"]), id(e["@error-code"]), self.children(e))
-
-def load(xml):
- fname = xml + ".pcl"
-
- if os.path.exists(fname) and mtime(fname) > mtime(__file__):
- file = open(fname, "r")
- s = cPickle.load(file)
- file.close()
- else:
- doc = mllib.xml_parse(xml)
- s = doc["amqp"].dispatch(Loader())
- s.register()
- s.resolve()
-
- try:
- file = open(fname, "w")
- except IOError:
- file = None
-
- if file:
- cPickle.dump(s, file)
- file.close()
-
- return s
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index 12b2781561..31c376b2f9 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -21,239 +21,13 @@
# Support library for qpid python tests.
#
-import sys, re, unittest, os, random, logging, traceback
-import qpid.client, qpid.spec, qmf.console
+import unittest, traceback, socket
+import qpid.client, qmf.console
import Queue
-from fnmatch import fnmatch
-from getopt import getopt, GetoptError
from qpid.content import Content
from qpid.message import Message
-
-#0-10 support
-from qpid.connection import Connection
-from qpid.spec010 import load
-from qpid.util import connect, ssl, URL
-
-def findmodules(root):
- """Find potential python modules under directory root"""
- found = []
- for dirpath, subdirs, files in os.walk(root):
- modpath = dirpath.replace(os.sep, '.')
- if not re.match(r'\.svn$', dirpath): # Avoid SVN directories
- for f in files:
- match = re.match(r'(.+)\.py$', f)
- if match and f != '__init__.py':
- found.append('.'.join([modpath, match.group(1)]))
- return found
-
-def default(value, default):
- if (value == None): return default
- else: return value
-
-class TestRunner:
-
- SPEC_FOLDER = "../specs"
- qpidd = os.getenv("QPIDD")
-
- """Runs unit tests.
-
- Parses command line arguments, provides utility functions for tests,
- runs the selected test suite.
- """
-
- def _die(self, message = None):
- if message: print message
- print """
-run-tests [options] [test*]
-The name of a test is package.module.ClassName.testMethod
-Options:
- -?/-h/--help : this message
- -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
- 0-8 - use the default 0-8 specification.
- 0-9 - use the default 0-9 specification.
- 0-10-errata - use the 0-10 specification with qpid errata.
- -e/--errata <errata.xml> : file containing amqp XML errata
- -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD
- env to point to broker executable. broker-args will be
- prepended with "--daemon --port=0"
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
- -S/--skip-self-test : skips the client self tests in the 'tests folder'
- -F/--spec-folder : folder that contains the specs to be loaded
- """
- sys.exit(1)
-
- def startBroker(self, brokerArgs):
- """Start a single broker daemon"""
- if TestRunner.qpidd == None:
- self._die("QPIDD environment var must point to qpidd when using -B/--start-broker")
- cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs)
- portStr = os.popen(cmd).read()
- if len(portStr) == 0:
- self._die("%s failed to start" % TestRunner.qpidd)
- port = int(portStr)
- pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read())
- print "Started broker: pid=%d, port=%d" % (pid, port)
- self.brokerTuple = (pid, port)
- self.setBroker("localhost:%d" % port)
-
- def stopBroker(self):
- """Stop the broker using qpidd -q"""
- if self.brokerTuple:
- ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q")
- if ret != 0:
- self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret))
- print "Stopped broker: pid=%d, port=%d" % self.brokerTuple
-
- def killBroker(self):
- """Kill the broker using kill -9 (SIGTERM)"""
- if self.brokerTuple:
- os.kill(self.brokerTuple[0], signal.SIGTERM)
- print "Killed broker: pid=%d, port=%d" % self.brokerTuple
-
- def setBroker(self, broker):
- try:
- self.url = URL(broker)
- except ValueError:
- self._die("'%s' is not a valid broker" % (broker))
- self.user = default(self.url.user, "guest")
- self.password = default(self.url.password, "guest")
- self.host = self.url.host
- if self.url.scheme == URL.AMQPS:
- self.ssl = True
- default_port = 5671
- else:
- self.ssl = False
- default_port = 5672
- self.port = default(self.url.port, default_port)
-
- def ignoreFile(self, filename):
- f = file(filename)
- for line in f.readlines(): self.ignore.append(line.strip())
- f.close()
-
- def use08spec(self):
- "True if we are running with the old 0-8 spec."
- # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons.
- return self.spec.major==8 and self.spec.minor==0
-
- def use09spec(self):
- "True if we are running with the 0-9 (non-wip) spec."
- return self.spec.major==0 and self.spec.minor==9
-
- def _parseargs(self, args):
- # Defaults
- self.setBroker("localhost")
- self.verbose = 1
- self.ignore = []
- self.specfile = "0-8"
- self.errata = []
- self.skip_self_test = False
-
- try:
- opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:",
- ["help", "spec", "errata=", "broker=",
- "start-broker=", "verbose", "skip-self-test", "ignore",
- "ignore-file", "spec-folder"])
- except GetoptError, e:
- self._die(str(e))
- # check for mutually exclusive options
- if "-B" in opts or "--start-broker" in opts:
- if "-b" in opts or "--broker" in opts:
- self._die("Cannot use -B/--start-broker and -b/broker options together")
- for opt, value in opts:
- if opt in ("-?", "-h", "--help"): self._die()
- if opt in ("-s", "--spec"): self.specfile = value
- if opt in ("-e", "--errata"): self.errata.append(value)
- if opt in ("-b", "--broker"): self.setBroker(value)
- if opt in ("-B", "--start-broker"): self.startBroker(value)
- if opt in ("-v", "--verbose"): self.verbose = 2
- if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
- if opt in ("-i", "--ignore"): self.ignore.append(value)
- if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
- if opt in ("-S", "--skip-self-test"): self.skip_self_test = True
- if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value
-
- # Abbreviations for default settings.
- if (self.specfile == "0-10"):
- self.spec = load(self.get_spec_file("amqp.0-10.xml"))
- elif (self.specfile == "0-10-errata"):
- self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml"))
- else:
- if (self.specfile == "0-8"):
- self.specfile = self.get_spec_file("amqp.0-8.xml")
- elif (self.specfile == "0-9"):
- self.specfile = self.get_spec_file("amqp.0-9.xml")
- self.errata.append(self.get_spec_file("amqp-errata.0-9.xml"))
-
- if (self.specfile == None):
- self._die("No XML specification provided")
- print "Using specification from:", self.specfile
-
- self.spec = qpid.spec.load(self.specfile, *self.errata)
-
- if len(self.tests) == 0:
- if not self.skip_self_test:
- self.tests=findmodules("tests")
- if self.use08spec() or self.use09spec():
- self.tests+=findmodules("tests_0-8")
- elif (self.spec.major == 99 and self.spec.minor == 0):
- self.tests+=findmodules("tests_0-10_preview")
- elif (self.spec.major == 0 and self.spec.minor == 10):
- self.tests+=findmodules("tests_0-10")
-
- def testSuite(self):
- class IgnoringTestSuite(unittest.TestSuite):
- def addTest(self, test):
- if isinstance(test, unittest.TestCase):
- for pattern in testrunner.ignore:
- if fnmatch(test.id(), pattern):
- return
- unittest.TestSuite.addTest(self, test)
-
- # Use our IgnoringTestSuite in the test loader.
- unittest.TestLoader.suiteClass = IgnoringTestSuite
- return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
-
- def run(self, args=sys.argv[1:]):
- self.brokerTuple = None
- self._parseargs(args)
- runner = unittest.TextTestRunner(descriptions=False,
- verbosity=self.verbose)
- result = runner.run(self.testSuite())
-
- if (self.ignore):
- print "======================================="
- print "NOTE: the following tests were ignored:"
- for t in self.ignore: print t
- print "======================================="
-
- self.stopBroker()
- return result.wasSuccessful()
-
- def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None):
- """Connect to the broker, returns a qpid.client.Client"""
- host = host or self.host
- port = port or self.port
- spec = spec or self.spec
- user = user or self.user
- password = password or self.password
- client = qpid.client.Client(host, port, spec)
- if self.use08spec():
- client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
- else:
- client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
- return client
-
- def get_spec_file(self, fname):
- return TestRunner.SPEC_FOLDER + os.sep + fname
-
-# Global instance for tests to call connect.
-testrunner = TestRunner()
-
+from qpid.harness import Skipped
+from qpid.exceptions import VersionError
class TestBase(unittest.TestCase):
"""Base class for Qpid test cases.
@@ -267,6 +41,9 @@ class TestBase(unittest.TestCase):
resources to clean up later.
"""
+ def configure(self, config):
+ self.config = config
+
def setUp(self):
self.queues = []
self.exchanges = []
@@ -293,9 +70,26 @@ class TestBase(unittest.TestCase):
else:
self.client.close()
- def connect(self, *args, **keys):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None):
"""Create a new connction, return the Client object"""
- return testrunner.connect(*args, **keys)
+ host = host or self.config.broker.host
+ port = port or self.config.broker.port or 5672
+ user = user or "guest"
+ password = password or "guest"
+ client = qpid.client.Client(host, port)
+ try:
+ if client.spec.major == 8 and client.spec.minor == 0:
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
+ else:
+ client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
+ except qpid.client.Closed, e:
+ if isinstance(e.args[0], VersionError):
+ raise Skipped(e.args[0])
+ else:
+ raise e
+ except socket.error, e:
+ raise Skipped(e)
+ return client
def queue_declare(self, channel=None, *args, **keys):
channel = channel or self.channel
@@ -319,17 +113,8 @@ class TestBase(unittest.TestCase):
def consume(self, queueName):
"""Consume from named queue returns the Queue object."""
- if testrunner.use08spec() or testrunner.use09spec():
- reply = self.channel.basic_consume(queue=queueName, no_ack=True)
- return self.client.queue(reply.consumer_tag)
- else:
- if not "uniqueTag" in dir(self): self.uniqueTag = 1
- else: self.uniqueTag += 1
- consumer_tag = "tag" + str(self.uniqueTag)
- self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
- self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
- self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
- return self.client.queue(consumer_tag)
+ reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+ return self.client.queue(reply.consumer_tag)
def subscribe(self, channel=None, **keys):
channel = channel or self.channel
@@ -350,24 +135,14 @@ class TestBase(unittest.TestCase):
Publish to exchange and assert queue.get() returns the same message.
"""
body = self.uniqueString()
- if testrunner.use08spec() or testrunner.use09spec():
- self.channel.basic_publish(
- exchange=exchange,
- content=Content(body, properties=properties),
- routing_key=routing_key)
- else:
- self.channel.message_transfer(
- destination=exchange,
- content=Content(body, properties={'application_headers':properties,'routing_key':routing_key}))
+ self.channel.basic_publish(
+ exchange=exchange,
+ content=Content(body, properties=properties),
+ routing_key=routing_key)
msg = queue.get(timeout=1)
- if testrunner.use08spec() or testrunner.use09spec():
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content.properties)
- else:
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content['application_headers'])
+ self.assertEqual(body, msg.content.body)
+ if (properties):
+ self.assertEqual(properties, msg.content.properties)
def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
@@ -394,11 +169,19 @@ class TestBase(unittest.TestCase):
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
+#0-10 support
+from qpid.connection import Connection
+from qpid.util import connect, ssl, URL
+
class TestBase010(unittest.TestCase):
"""
Base class for Qpid test cases. using the final 0-10 spec
"""
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+
def setUp(self):
self.conn = self.connect()
self.session = self.conn.session("test-session", timeout=10)
@@ -406,15 +189,26 @@ class TestBase010(unittest.TestCase):
def startQmf(self, handler=None):
self.qmf = qmf.console.Session(handler)
- self.qmf_broker = self.qmf.addBroker(str(testrunner.url))
+ self.qmf_broker = self.qmf.addBroker(str(self.broker))
def connect(self, host=None, port=None):
- sock = connect(host or testrunner.host, port or testrunner.port)
- if testrunner.url.scheme == URL.AMQPS:
+ url = self.broker
+ if url.scheme == URL.AMQPS:
+ default_port = 5671
+ else:
+ default_port = 5672
+ try:
+ sock = connect(host or url.host, port or url.port or default_port)
+ except socket.error, e:
+ raise Skipped(e)
+ if url.scheme == URL.AMQPS:
sock = ssl(sock)
- conn = Connection(sock, username=testrunner.user,
- password=testrunner.password)
- conn.start(timeout=10)
+ conn = Connection(sock, username=url.user or "guest",
+ password=url.password or "guest")
+ try:
+ conn.start(timeout=10)
+ except VersionError, e:
+ raise Skipped(e)
return conn
def tearDown(self):
diff --git a/qpid/python/qpid/tests/framing.py b/qpid/python/qpid/tests/framing.py
index 4cd596b583..0b33df8b9a 100644
--- a/qpid/python/qpid/tests/framing.py
+++ b/qpid/python/qpid/tests/framing.py
@@ -40,6 +40,50 @@ class Base(Test):
assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2)
assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2)
+ def cmp_list(self, l1, l2):
+ if l1 is None:
+ assert l2 is None
+ return
+
+ assert len(l1) == len(l2)
+ for v1, v2 in zip(l1, l2):
+ if isinstance(v1, Compound):
+ self.cmp_ops(v1, v2)
+ else:
+ assert v1 == v2
+
+ def cmp_ops(self, op1, op2):
+ if op1 is None:
+ assert op2 is None
+ return
+
+ assert op1.__class__ == op2.__class__
+ cls = op1.__class__
+ assert op1.NAME == op2.NAME
+ assert op1.CODE == op2.CODE
+ assert op1.FIELDS == op2.FIELDS
+ for f in cls.FIELDS:
+ v1 = getattr(op1, f.name)
+ v2 = getattr(op2, f.name)
+ if COMPOUND.has_key(f.type) or f.type == "struct32":
+ self.cmp_ops(v1, v2)
+ elif f.type in ("list", "array"):
+ self.cmp_list(v1, v2)
+ else:
+ assert v1 == v2, "expected: %r, got %r" % (v1, v2)
+
+ if issubclass(cls, Command) or issubclass(cls, Control):
+ assert op1.channel == op2.channel
+
+ if issubclass(cls, Command):
+ assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync)
+ assert (op1.headers is None and op2.headers is None) or \
+ (op1.headers is not None and op2.headers is not None)
+ if op1.headers is not None:
+ assert len(op1.headers) == len(op2.headers)
+ for h1, h2 in zip(op1.headers, op2.headers):
+ self.cmp_ops(h1, h2)
+
class FrameTest(Base):
def enc_dec(self, frames, encoded=None):
@@ -171,3 +215,75 @@ class SegmentTest(Base):
for i in range(0, 8, 2)]
self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2)
+
+from qpid.ops import *
+
+class OpTest(Base):
+
+ def enc_dec(self, ops):
+ enc = OpEncoder()
+ dec = OpDecoder()
+ enc.write(*ops)
+ segs = enc.read()
+ dec.write(*segs)
+ dops = dec.read()
+ assert len(ops) == len(dops)
+ for op1, op2 in zip(ops, dops):
+ self.cmp_ops(op1, op2)
+
+ def testEmtpyMT(self):
+ self.enc_dec([MessageTransfer()])
+
+ def testEmptyMTSync(self):
+ self.enc_dec([MessageTransfer(sync=True)])
+
+ def testMT(self):
+ self.enc_dec([MessageTransfer(destination="asdf")])
+
+ def testSyncMT(self):
+ self.enc_dec([MessageTransfer(destination="asdf", sync=True)])
+
+ def testEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(payload="")])
+
+ def testPayloadMT(self):
+ self.enc_dec([MessageTransfer(payload="test payload")])
+
+ def testHeadersEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])])
+
+ def testHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")])
+
+ def testMultiHeadersEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])])
+
+ def testMultiHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")])
+
+ def testContentTypeHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")])
+
+ def testMulti(self):
+ self.enc_dec([MessageTransfer(),
+ MessageTransfer(sync=True),
+ MessageTransfer(destination="one"),
+ MessageTransfer(destination="two", sync=True),
+ MessageTransfer(destination="three", payload="test payload")])
+
+ def testControl(self):
+ self.enc_dec([SessionAttach(name="asdf")])
+
+ def testMixed(self):
+ self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")])
+
+ def testChannel(self):
+ self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)])
+
+ def testCompound(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])])
+
+ def testListCompound(self):
+ self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"),
+ Xid(global_id="two"),
+ Xid(global_id="three")]))])
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py
index 8a142d6c96..7706ebbabe 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging.py
@@ -22,6 +22,7 @@
import time
from qpid.tests import Test
+from qpid.harness import Skipped
from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
@@ -42,7 +43,10 @@ class Base(Test):
def setup(self):
self.test_id = uuid4()
self.broker = self.config.broker
- self.conn = self.setup_connection()
+ try:
+ self.conn = self.setup_connection()
+ except ConnectError, e:
+ raise Skipped(e)
self.ssn = self.setup_session()
self.snd = self.setup_sender()
self.rcv = self.setup_receiver()
@@ -65,7 +69,7 @@ class Base(Test):
receiver = ssn.receiver("ping-queue")
msg = receiver.fetch(0)
ssn.acknowledge()
- assert msg.content == content
+ assert msg.content == content, "expected %r, got %r" % (content, msg.content)
def drain(self, rcv, limit=None):
contents = []
diff --git a/qpid/python/qpid_config.py b/qpid/python/qpid_config.py
index 8f987e9962..3cf6b69b7e 100644
--- a/qpid/python/qpid_config.py
+++ b/qpid/python/qpid_config.py
@@ -21,3 +21,5 @@ import os
qpid_home = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
amqp_spec = os.path.join(qpid_home, "specs", "amqp.0-10-qpid-errata.xml")
+amqp_spec_0_8 = os.path.join(qpid_home, "specs", "amqp.0-8.xml")
+amqp_spec_0_9 = os.path.join(qpid_home, "specs", "amqp.0-9.xml")
diff --git a/qpid/python/tests/__init__.py b/qpid/python/tests/__init__.py
index 8ad514fc2f..1e495f3af3 100644
--- a/qpid/python/tests/__init__.py
+++ b/qpid/python/tests/__init__.py
@@ -19,12 +19,4 @@
# under the License.
#
-from codec import *
-from queue import *
-from spec import *
-from framer import *
-from assembler import *
-from datatypes import *
-from connection import *
-from spec010 import *
-from codec010 import *
+import codec, queue, datatypes, connection, spec010, codec010
diff --git a/qpid/python/tests/assembler.py b/qpid/python/tests/assembler.py
deleted file mode 100644
index f4e37084b6..0000000000
--- a/qpid/python/tests/assembler.py
+++ /dev/null
@@ -1,78 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from threading import *
-from unittest import TestCase
-from qpid.util import connect, listen
-from qpid.assembler import *
-
-PORT = 1234
-
-class AssemblerTest(TestCase):
-
- def setUp(self):
- started = Event()
- self.running = True
-
- def run():
- running = True
- for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
- asm = Assembler(s)
- try:
- asm.write_header(*asm.read_header()[-2:])
- while True:
- seg = asm.read_segment()
- asm.write_segment(seg)
- except Closed:
- pass
-
- self.server = Thread(target=run)
- self.server.setDaemon(True)
- self.server.start()
-
- started.wait(3)
- assert started.isSet()
-
- def tearDown(self):
- self.running = False
- self.server.join()
-
- def test(self):
- asm = Assembler(connect("0.0.0.0", PORT), max_payload = 1)
- asm.write_header(0, 10)
- asm.write_segment(Segment(True, False, 1, 2, 3, "TEST"))
- asm.write_segment(Segment(False, True, 1, 2, 3, "ING"))
-
- assert asm.read_header() == ("AMQP", 1, 1, 0, 10)
-
- seg = asm.read_segment()
- assert seg.first == True
- assert seg.last == False
- assert seg.type == 1
- assert seg.track == 2
- assert seg.channel == 3
- assert seg.payload == "TEST"
-
- seg = asm.read_segment()
- assert seg.first == False
- assert seg.last == True
- assert seg.type == 1
- assert seg.track == 2
- assert seg.channel == 3
- assert seg.payload == "ING"
diff --git a/qpid/python/tests/codec.py b/qpid/python/tests/codec.py
index 4bd3675af8..9b51b4713c 100644
--- a/qpid/python/tests/codec.py
+++ b/qpid/python/tests/codec.py
@@ -23,7 +23,6 @@ from qpid.codec import Codec
from qpid.spec import load
from cStringIO import StringIO
from qpid.reference import ReferenceId
-from qpid.testlib import testrunner
__doc__ = """
@@ -54,13 +53,8 @@ __doc__ = """
"""
-SPEC = None
-
-def spec():
- global SPEC
- if SPEC == None:
- SPEC = load(testrunner.get_spec_file("amqp.0-8.xml"))
- return SPEC
+from qpid_config import amqp_spec_0_8
+SPEC = load(amqp_spec_0_8)
# --------------------------------------
# --------------------------------------
@@ -76,7 +70,7 @@ class BaseDataTypes(unittest.TestCase):
"""
standard setUp for unitetest (refer unittest documentation for details)
"""
- self.codec = Codec(StringIO(), spec())
+ self.codec = Codec(StringIO(), SPEC)
# ------------------
def tearDown(self):
@@ -507,7 +501,7 @@ def test(type, value):
else:
values = [value]
stream = StringIO()
- codec = Codec(stream, spec())
+ codec = Codec(stream, SPEC)
for v in values:
codec.encode(type, v)
codec.flush()
diff --git a/qpid/python/tests/codec010.py b/qpid/python/tests/codec010.py
index a1f89dc3f4..787ebc146f 100644
--- a/qpid/python/tests/codec010.py
+++ b/qpid/python/tests/codec010.py
@@ -20,21 +20,17 @@
import time
from unittest import TestCase
-from qpid.spec010 import load
from qpid.codec010 import StringCodec
-from qpid.testlib import testrunner
from qpid.datatypes import timestamp, uuid4
+from qpid.ops import PRIMITIVE
class CodecTest(TestCase):
- def setUp(self):
- self.spec = load(testrunner.get_spec_file("amqp.0-10.xml"))
-
def check(self, type, value, compare=True):
- t = self.spec[type]
- sc = StringCodec(self.spec)
- t.encode(sc, value)
- decoded = t.decode(sc)
+ t = PRIMITIVE[type]
+ sc = StringCodec()
+ sc.write_primitive(t, value)
+ decoded = sc.read_primitive(t)
if compare:
assert decoded == value, "%s, %s" % (decoded, value)
return decoded
diff --git a/qpid/python/tests/connection.py b/qpid/python/tests/connection.py
index 19cdad9f97..d340e4e9c1 100644
--- a/qpid/python/tests/connection.py
+++ b/qpid/python/tests/connection.py
@@ -22,10 +22,10 @@ from unittest import TestCase
from qpid.util import connect, listen
from qpid.connection import *
from qpid.datatypes import Message
-from qpid.testlib import testrunner
from qpid.delegates import Server
from qpid.queue import Queue
from qpid.session import Delegate
+from qpid.ops import QueueQueryResult
PORT = 1234
@@ -51,12 +51,12 @@ class TestSession(Delegate):
pass
def queue_query(self, qq):
- return qq._type.result.type.new((qq.queue,), {})
+ return QueueQueryResult(qq.queue)
- def message_transfer(self, cmd, headers, body):
+ def message_transfer(self, cmd):
if cmd.destination == "echo":
- m = Message(body)
- m.headers = headers
+ m = Message(cmd.payload)
+ m.headers = cmd.headers
self.session.message_transfer(cmd.destination, cmd.accept_mode,
cmd.acquire_mode, m)
elif cmd.destination == "abort":
@@ -64,7 +64,7 @@ class TestSession(Delegate):
elif cmd.destination == "heartbeat":
self.session.channel.connection_heartbeat()
else:
- self.queue.put((cmd, headers, body))
+ self.queue.put(cmd)
class ConnectionTest(TestCase):
@@ -134,17 +134,17 @@ class ConnectionTest(TestCase):
ssn.message_transfer(d)
for d in destinations:
- cmd, header, body = self.queue.get(10)
+ cmd = self.queue.get(10)
assert cmd.destination == d
- assert header == None
- assert body == None
+ assert cmd.headers == None
+ assert cmd.payload == None
msg = Message("this is a test")
ssn.message_transfer("four", message=msg)
- cmd, header, body = self.queue.get(10)
+ cmd = self.queue.get(10)
assert cmd.destination == "four"
- assert header == None
- assert body == msg.body
+ assert cmd.headers == None
+ assert cmd.payload == msg.body
qq = ssn.queue_query("asdf")
assert qq.queue == "asdf"
diff --git a/qpid/python/tests/datatypes.py b/qpid/python/tests/datatypes.py
index e9e09094fa..1a60bb4107 100644
--- a/qpid/python/tests/datatypes.py
+++ b/qpid/python/tests/datatypes.py
@@ -18,9 +18,8 @@
#
from unittest import TestCase
-from qpid.testlib import testrunner
-from qpid.spec010 import load
from qpid.datatypes import *
+from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties
class SerialTest(TestCase):
@@ -176,10 +175,9 @@ class UUIDTest(TestCase):
class MessageTest(TestCase):
def setUp(self):
- self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml"))
- self.mp = Struct(self.spec["message.message_properties"])
- self.dp = Struct(self.spec["message.delivery_properties"])
- self.fp = Struct(self.spec["message.fragment_properties"])
+ self.mp = MessageProperties()
+ self.dp = DeliveryProperties()
+ self.fp = FragmentProperties()
def testHas(self):
m = Message(self.mp, self.dp, self.fp, "body")
@@ -207,7 +205,7 @@ class MessageTest(TestCase):
def testSetReplace(self):
m = Message(self.mp, self.dp, self.fp, "body")
- dp = Struct(self.spec["message.delivery_properties"])
+ dp = DeliveryProperties()
assert m.get("delivery_properties") == self.dp
assert m.get("delivery_properties") != dp
m.set(dp)
diff --git a/qpid/python/tests/framer.py b/qpid/python/tests/framer.py
deleted file mode 100644
index e99166721c..0000000000
--- a/qpid/python/tests/framer.py
+++ /dev/null
@@ -1,95 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from threading import *
-from unittest import TestCase
-from qpid.util import connect, listen
-from qpid.framer import *
-
-PORT = 1234
-
-class FramerTest(TestCase):
-
- def setUp(self):
- self.running = True
- started = Event()
- def run():
- for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
- conn = Framer(s)
- try:
- conn.write_header(*conn.read_header()[-2:])
- while True:
- frame = conn.read_frame()
- conn.write_frame(frame)
- conn.flush()
- except Closed:
- pass
-
- self.server = Thread(target=run)
- self.server.setDaemon(True)
- self.server.start()
-
- started.wait(3)
- assert started.isSet()
-
- def tearDown(self):
- self.running = False
- self.server.join(3)
-
- def test(self):
- c = Framer(connect("0.0.0.0", PORT))
-
- c.write_header(0, 10)
- assert c.read_header() == ("AMQP", 1, 1, 0, 10)
-
- c.write_frame(Frame(FIRST_FRM, 1, 2, 3, "THIS"))
- c.write_frame(Frame(0, 1, 2, 3, "IS"))
- c.write_frame(Frame(0, 1, 2, 3, "A"))
- c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST"))
- c.flush()
-
- f = c.read_frame()
- assert f.flags & FIRST_FRM
- assert not (f.flags & LAST_FRM)
- assert f.type == 1
- assert f.track == 2
- assert f.channel == 3
- assert f.payload == "THIS"
-
- f = c.read_frame()
- assert f.flags == 0
- assert f.type == 1
- assert f.track == 2
- assert f.channel == 3
- assert f.payload == "IS"
-
- f = c.read_frame()
- assert f.flags == 0
- assert f.type == 1
- assert f.track == 2
- assert f.channel == 3
- assert f.payload == "A"
-
- f = c.read_frame()
- assert f.flags & LAST_FRM
- assert not (f.flags & FIRST_FRM)
- assert f.type == 1
- assert f.track == 2
- assert f.channel == 3
- assert f.payload == "TEST"
diff --git a/qpid/python/tests/spec.py b/qpid/python/tests/spec.py
deleted file mode 100644
index d5ea1d682a..0000000000
--- a/qpid/python/tests/spec.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from unittest import TestCase
-from qpid.spec import load
-from qpid.testlib import testrunner
-
-class SpecTest(TestCase):
-
- def check_load(self, *urls):
- spec = load(*map(testrunner.get_spec_file, urls))
- qdecl = spec.method("queue_declare")
- assert qdecl != None
- assert not qdecl.content
-
- queue = qdecl.fields.byname["queue"]
- assert queue != None
- assert queue.domain.name == "queue_name"
- assert queue.type == "shortstr"
-
- qdecl_ok = spec.method("queue_declare_ok")
-
- # 0-8 is actually 8-0
- if (spec.major == 8 and spec.minor == 0 or
- spec.major == 0 and spec.minor == 9):
- assert qdecl_ok != None
-
- assert len(qdecl.responses) == 1
- assert qdecl_ok in qdecl.responses
-
- publish = spec.method("basic_publish")
- assert publish != None
- assert publish.content
-
- if (spec.major == 0 and spec.minor == 10):
- assert qdecl_ok == None
- reply_to = spec.domains.byname["reply_to"]
- assert reply_to.type.size == 2
- assert reply_to.type.pack == 2
- assert len(reply_to.type.fields) == 2
-
- qq = spec.method("queue_query")
- assert qq != None
- assert qq.result.size == 4
- assert qq.result.type != None
- args = qq.result.fields.byname["arguments"]
- assert args.type == "table"
-
- def test_load_0_8(self):
- self.check_load("amqp.0-8.xml")
-
- def test_load_0_9(self):
- self.check_load("amqp.0-9.xml")
-
- def test_load_0_9_errata(self):
- self.check_load("amqp.0-9.xml", "amqp-errata.0-9.xml")
-
- def test_load_0_10(self):
- self.check_load("amqp.0-10-preview.xml")
diff --git a/qpid/python/tests/spec010.py b/qpid/python/tests/spec010.py
index df9cb9590a..ac04e1ee02 100644
--- a/qpid/python/tests/spec010.py
+++ b/qpid/python/tests/spec010.py
@@ -19,66 +19,56 @@
import os, tempfile, shutil, stat
from unittest import TestCase
-from qpid.spec010 import load
from qpid.codec010 import Codec, StringCodec
-from qpid.testlib import testrunner
-from qpid.datatypes import Struct
+from qpid.ops import *
class SpecTest(TestCase):
- def setUp(self):
- self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml"))
-
def testSessionHeader(self):
- hdr = self.spec["session.header"]
- sc = StringCodec(self.spec)
- hdr.encode(sc, Struct(hdr, sync=True))
+ sc = StringCodec()
+ sc.write_compound(Header(sync=True))
assert sc.encoded == "\x01\x01"
- sc = StringCodec(self.spec)
- hdr.encode(sc, Struct(hdr, sync=False))
+ sc = StringCodec()
+ sc.write_compound(Header(sync=False))
assert sc.encoded == "\x01\x00"
- def encdec(self, type, value):
- sc = StringCodec(self.spec)
- type.encode(sc, value)
- decoded = type.decode(sc)
+ def encdec(self, value):
+ sc = StringCodec()
+ sc.write_compound(value)
+ decoded = sc.read_compound(value.__class__)
return decoded
def testMessageProperties(self):
- mp = self.spec["message.message_properties"]
- rt = self.spec["message.reply_to"]
-
- props = Struct(mp, content_length=3735928559L,
- reply_to=Struct(rt, exchange="the exchange name",
- routing_key="the routing key"))
- dec = self.encdec(mp, props)
+ props = MessageProperties(content_length=3735928559L,
+ reply_to=ReplyTo(exchange="the exchange name",
+ routing_key="the routing key"))
+ dec = self.encdec(props)
assert props.content_length == dec.content_length
assert props.reply_to.exchange == dec.reply_to.exchange
assert props.reply_to.routing_key == dec.reply_to.routing_key
def testMessageSubscribe(self):
- ms = self.spec["message.subscribe"]
- cmd = Struct(ms, exclusive=True, destination="this is a test")
- dec = self.encdec(self.spec["message.subscribe"], cmd)
+ cmd = MessageSubscribe(exclusive=True, destination="this is a test")
+ dec = self.encdec(cmd)
assert cmd.exclusive == dec.exclusive
assert cmd.destination == dec.destination
def testXid(self):
- xid = self.spec["dtx.xid"]
- sc = StringCodec(self.spec)
- st = Struct(xid, format=0, global_id="gid", branch_id="bid")
- xid.encode(sc, st)
+ sc = StringCodec()
+ xid = Xid(format=0, global_id="gid", branch_id="bid")
+ sc.write_compound(xid)
assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
- assert xid.decode(sc).__dict__ == st.__dict__
+ dec = sc.read_compound(Xid)
+ assert xid.__dict__ == dec.__dict__
- def testLoadReadOnly(self):
- spec = "amqp.0-10-qpid-errata.xml"
- f = testrunner.get_spec_file(spec)
- dest = tempfile.mkdtemp()
- shutil.copy(f, dest)
- shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
- os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
- fname = os.path.join(dest, spec)
- load(fname)
- assert not os.path.exists("%s.pcl" % fname)
+# def testLoadReadOnly(self):
+# spec = "amqp.0-10-qpid-errata.xml"
+# f = testrunner.get_spec_file(spec)
+# dest = tempfile.mkdtemp()
+# shutil.copy(f, dest)
+# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
+# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
+# fname = os.path.join(dest, spec)
+# load(fname)
+# assert not os.path.exists("%s.pcl" % fname)
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index f80eca6363..d40d5e811e 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -477,7 +477,7 @@ class MessageTests(TestBase010):
#send message A
ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A"))
- for unit in ssn.credit_unit.values():
+ for unit in ssn.credit_unit.VALUES:
ssn.message_flow("c", unit, 0xFFFFFFFFL)
q = ssn.incoming("c")
@@ -490,7 +490,7 @@ class MessageTests(TestBase010):
ssn.channel.session_completed(ssn.receiver._completed)
ssn.message_accept(RangedSet(msgA.id))
- for unit in ssn.credit_unit.values():
+ for unit in ssn.credit_unit.VALUES:
ssn.message_flow("c", unit, 0xFFFFFFFFL)
#send message B
diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py
index 463fbcb888..8cdc539a08 100644
--- a/qpid/python/tests_0-10/tx.py
+++ b/qpid/python/tests_0-10/tx.py
@@ -19,7 +19,7 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.datatypes import Message, RangedSet
-from qpid.testlib import testrunner, TestBase010
+from qpid.testlib import TestBase010
class TxTests(TestBase010):
"""
diff --git a/qpid/python/tests_0-8/__init__.py b/qpid/python/tests_0-8/__init__.py
index 9a09d2d04f..526f2452f8 100644
--- a/qpid/python/tests_0-8/__init__.py
+++ b/qpid/python/tests_0-8/__init__.py
@@ -18,3 +18,5 @@
# specific language governing permissions and limitations
# under the License.
#
+
+import basic, broker, example, exchange, queue, testlib, tx
diff --git a/qpid/python/tests_0-8/basic.py b/qpid/python/tests_0-8/basic.py
index 95ca0d7287..d5837fc19c 100644
--- a/qpid/python/tests_0-8/basic.py
+++ b/qpid/python/tests_0-8/basic.py
@@ -19,7 +19,7 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase
class BasicTests(TestBase):
"""Tests for 'methods' on the amqp basic 'class'"""
@@ -219,10 +219,11 @@ class BasicTests(TestBase):
channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
channel.basic_cancel(consumer_tag=subscription.consumer_tag)
- subscription2 = channel.basic_consume(queue="test-requeue")
- queue2 = self.client.queue(subscription2.consumer_tag)
channel.basic_recover(requeue=True)
+
+ subscription2 = channel.basic_consume(queue="test-requeue")
+ queue2 = self.client.queue(subscription2.consumer_tag)
msg3b = queue2.get(timeout=1)
msg5b = queue2.get(timeout=1)
diff --git a/qpid/python/tests_0-8/broker.py b/qpid/python/tests_0-8/broker.py
index d9ac69c5e3..7f3fe7530e 100644
--- a/qpid/python/tests_0-8/broker.py
+++ b/qpid/python/tests_0-8/broker.py
@@ -19,15 +19,15 @@
from qpid.client import Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase
class BrokerTests(TestBase):
"""Tests for basic Broker functionality"""
- def test_amqp_basic_13(self):
+ def test_ack_and_no_ack(self):
"""
First, this test tries to receive a message with a no-ack
- consumer. Second, this test tries to explicitely receive and
+ consumer. Second, this test tries to explicitly receive and
acknowledge a message with an acknowledging consumer.
"""
ch = self.channel
@@ -40,7 +40,7 @@ class BrokerTests(TestBase):
msg = self.client.queue(ctag).get(timeout = 5)
self.assert_(msg.content.body == body)
- # Acknowleding consumer
+ # Acknowledging consumer
self.queue_declare(ch, queue = "otherqueue")
ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag
body = "test ack"
@@ -102,3 +102,19 @@ class BrokerTests(TestBase):
except Closed, e:
self.assertConnectionException(504, e.args[0])
+ def test_channel_flow(self):
+ channel = self.channel
+ channel.queue_declare(queue="flow_test_queue", exclusive=True)
+ ctag = channel.basic_consume(queue="flow_test_queue", no_ack=True).consumer_tag
+ incoming = self.client.queue(ctag)
+
+ channel.channel_flow(active=False)
+ channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz"))
+ try:
+ incoming.get(timeout=1)
+ self.fail("Received message when flow turned off.")
+ except Empty: None
+
+ channel.channel_flow(active=True)
+ msg = incoming.get(timeout=1)
+ self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body)
diff --git a/qpid/python/tests_0-8/example.py b/qpid/python/tests_0-8/example.py
index a1949ccb9f..d82bad1f61 100644
--- a/qpid/python/tests_0-8/example.py
+++ b/qpid/python/tests_0-8/example.py
@@ -18,7 +18,7 @@
#
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase
class ExampleTest (TestBase):
"""
diff --git a/qpid/python/tests_0-8/queue.py b/qpid/python/tests_0-8/queue.py
index 60ac4c3dfb..b7a41736ab 100644
--- a/qpid/python/tests_0-8/queue.py
+++ b/qpid/python/tests_0-8/queue.py
@@ -19,7 +19,7 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase
class QueueTests(TestBase):
"""Tests for 'methods' on the amqp queue 'class'"""
diff --git a/qpid/python/tests_0-8/testlib.py b/qpid/python/tests_0-8/testlib.py
index cab07cc4ac..76f7e964a2 100644
--- a/qpid/python/tests_0-8/testlib.py
+++ b/qpid/python/tests_0-8/testlib.py
@@ -22,7 +22,7 @@
#
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase
from Queue import Empty
import sys
diff --git a/qpid/python/tests_0-8/tx.py b/qpid/python/tests_0-8/tx.py
index 054fb8d8b7..9faddb1110 100644
--- a/qpid/python/tests_0-8/tx.py
+++ b/qpid/python/tests_0-8/tx.py
@@ -19,7 +19,7 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase
class TxTests(TestBase):
"""
diff --git a/qpid/python/tests_0-9/__init__.py b/qpid/python/tests_0-9/__init__.py
index 9a09d2d04f..d9f2ed7dbb 100644
--- a/qpid/python/tests_0-9/__init__.py
+++ b/qpid/python/tests_0-9/__init__.py
@@ -18,3 +18,5 @@
# specific language governing permissions and limitations
# under the License.
#
+
+import query, queue
diff --git a/qpid/python/tests_0-9/basic.py b/qpid/python/tests_0-9/basic.py
deleted file mode 100644
index 607ba26343..0000000000
--- a/qpid/python/tests_0-9/basic.py
+++ /dev/null
@@ -1,396 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class BasicTests(TestBase):
- """Tests for 'methods' on the amqp basic 'class'"""
-
- def test_consume_no_local(self):
- """
- Test that the no_local flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True)
- #establish two consumers one of which excludes delivery of locally sent messages
- channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
- channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
-
- #send a message
- channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local"))
- channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local"))
-
- #check the queues of the two consumers
- excluded = self.client.queue("local_excluded")
- included = self.client.queue("local_included")
- msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.content.body)
- try:
- excluded.get(timeout=1)
- self.fail("Received locally published message though no_local=true")
- except Empty: None
-
-
- def test_consume_exclusive(self):
- """
- Test that the exclusive flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
- try:
- channel.basic_consume(consumer_tag="second", queue="test-queue-2")
- self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new channel and cleanup last consumer:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #check that an exclusive consumer cannot be created if a consumer already exists:
- channel.basic_consume(consumer_tag="first", queue="test-queue-2")
- try:
- channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
- self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- def test_consume_queue_errors(self):
- """
- Test error conditions associated with the queue field of the consume method:
- """
- channel = self.channel
- try:
- #queue specified but doesn't exist:
- channel.basic_consume(queue="invalid-queue")
- self.fail("Expected failure when consuming from non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.basic_consume(queue="")
- self.fail("Expected failure when consuming from unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_consume_unique_consumers(self):
- """
- Ensure unique consumer tags are enforced
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True)
-
- #check that attempts to use duplicate tags are detected and prevented:
- channel.basic_consume(consumer_tag="first", queue="test-queue-3")
- try:
- channel.basic_consume(consumer_tag="first", queue="test-queue-3")
- self.fail("Expected consume request to fail due to non-unique tag")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_cancel(self):
- """
- Test compliance of the basic.cancel method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
- channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
-
- myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.content.body)
-
- #cancel should stop messages being delivered
- channel.basic_cancel(consumer_tag="my-consumer")
- channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
- try:
- msg = myqueue.get(timeout=1)
- self.fail("Got message after cancellation: " + msg)
- except Empty: None
-
- #cancellation of non-existant consumers should be handled without error
- channel.basic_cancel(consumer_tag="my-consumer")
- channel.basic_cancel(consumer_tag="this-never-existed")
-
-
- def test_ack(self):
- """
- Test basic ack/recover behaviour
- """
- channel = self.channel
- channel.queue_declare(queue="test-ack-queue", exclusive=True)
-
- reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
- queue = self.client.queue(reply.consumer_tag)
-
- channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_recover(requeue=False)
-
- msg3b = queue.get(timeout=1)
- msg5b = queue.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- def test_recover_requeue(self):
- """
- Test requeing on recovery
- """
- channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True)
-
- subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- channel.basic_publish(routing_key="test-requeue", content=Content("One"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_cancel(consumer_tag=subscription.consumer_tag)
-
- channel.basic_recover(requeue=True)
-
- subscription2 = channel.basic_consume(queue="test-requeue")
- queue2 = self.client.queue(subscription2.consumer_tag)
-
- msg3b = queue2.get(timeout=1)
- msg5b = queue2.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- self.assertEqual(True, msg3b.redelivered)
- self.assertEqual(True, msg5b.redelivered)
-
- try:
- extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.content.body)
- except Empty: None
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.content.body)
- except Empty: None
-
-
- def test_qos_prefetch_count(self):
- """
- Test that the prefetch count specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- #set prefetch to 5:
- channel.basic_qos(prefetch_count=5)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
-
- #only 5 messages should have been delivered:
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
-
-
- def test_qos_prefetch_size(self):
- """
- Test that the prefetch size specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- #set prefetch to 50 bytes (each message is 9 or 10 bytes):
- channel.basic_qos(prefetch_size=50)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
-
- #only 5 messages should have been delivered (i.e. 45 bytes worth):
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
- #make sure that a single oversized message still gets delivered
- large = "abcdefghijklmnopqrstuvwxyz"
- large = large + "-" + large;
- channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
- msg = queue.get(timeout=1)
- self.assertEqual(large, msg.content.body)
-
- def test_get(self):
- """
- Test basic_get method
- """
- channel = self.channel
- channel.queue_declare(queue="test-get", exclusive=True)
-
- #publish some messages (no_ack=True)
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
-
- #use basic_get to read back the messages, and check that we get an empty at the end
- for i in range(1, 11):
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- #repeat for no_ack=False
- for i in range(11, 21):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
-
- for i in range(11, 21):
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
- if(i == 13):
- channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
- if(i in [15, 17, 19]):
- channel.basic_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- #recover(requeue=True)
- channel.basic_recover(requeue=True)
-
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_ok")
- self.assertEqual("Message %d" % i, reply.content.body)
- channel.basic_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
-
- channel.basic_recover(requeue=True)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get_empty")
diff --git a/qpid/python/tests_0-9/broker.py b/qpid/python/tests_0-9/broker.py
deleted file mode 100644
index 03b4132d3e..0000000000
--- a/qpid/python/tests_0-9/broker.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class BrokerTests(TestBase):
- """Tests for basic Broker functionality"""
-
- def test_ack_and_no_ack(self):
- """
- First, this test tries to receive a message with a no-ack
- consumer. Second, this test tries to explicitly receive and
- acknowledge a message with an acknowledging consumer.
- """
- ch = self.channel
- self.queue_declare(ch, queue = "myqueue")
-
- # No ack consumer
- ctag = "tag1"
- ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True)
- body = "test no-ack"
- ch.message_transfer(routing_key = "myqueue", body = body)
- msg = self.client.queue(ctag).get(timeout = 5)
- self.assert_(msg.body == body)
-
- # Acknowledging consumer
- self.queue_declare(ch, queue = "otherqueue")
- ctag = "tag2"
- ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False)
- body = "test ack"
- ch.message_transfer(routing_key = "otherqueue", body = body)
- msg = self.client.queue(ctag).get(timeout = 5)
- msg.ok()
- self.assert_(msg.body == body)
-
- def test_simple_delivery_immediate(self):
- """
- Test simple message delivery where consume is issued before publish
- """
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- consumer_tag = "tag1"
- channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True)
- queue = self.client.queue(consumer_tag)
-
- body = "Immediate Delivery"
- channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True)
- msg = queue.get(timeout=5)
- self.assert_(msg.body == body)
-
- # TODO: Ensure we fail if immediate=True and there's no consumer.
-
-
- def test_simple_delivery_queued(self):
- """
- Test basic message delivery where publish is issued before consume
- (i.e. requires queueing of the message)
- """
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- body = "Queued Delivery"
- channel.message_transfer(destination="test-exchange", routing_key="key", body=body)
-
- consumer_tag = "tag1"
- channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True)
- queue = self.client.queue(consumer_tag)
- msg = queue.get(timeout=5)
- self.assert_(msg.body == body)
-
- def test_invalid_channel(self):
- channel = self.client.channel(200)
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for invalid channel")
- except Closed, e:
- self.assertConnectionException(504, e.args[0])
-
- def test_closed_channel(self):
- channel = self.client.channel(200)
- channel.channel_open()
- channel.channel_close()
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for closed channel")
- except Closed, e:
- if isinstance(e.args[0], str): self.fail(e)
- self.assertConnectionException(504, e.args[0])
-
- def test_ping_pong(self):
- channel = self.channel
- reply = channel.channel_ping()
- self.assertEqual(reply.method.klass.name, "channel")
- self.assertEqual(reply.method.name, "ok")
- #todo: provide a way to get notified of incoming pongs...
-
- def test_channel_flow(self):
- channel = self.channel
- channel.queue_declare(queue="flow_test_queue", exclusive=True)
- channel.message_consume(destination="my-tag", queue="flow_test_queue")
- incoming = self.client.queue("my-tag")
-
- channel.channel_flow(active=False)
- channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz")
- try:
- incoming.get(timeout=1)
- self.fail("Received message when flow turned off.")
- except Empty: None
-
- channel.channel_flow(active=True)
- msg = incoming.get(timeout=1)
- self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body)
diff --git a/qpid/python/tests_0-9/dtx.py b/qpid/python/tests_0-9/dtx.py
deleted file mode 100644
index bc268f4129..0000000000
--- a/qpid/python/tests_0-9/dtx.py
+++ /dev/null
@@ -1,587 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from struct import pack, unpack
-from time import sleep
-
-class DtxTests(TestBase):
- """
- Tests for the amqp dtx related classes.
-
- Tests of the form test_simple_xxx test the basic transactional
- behaviour. The approach here is to 'swap' a message from one queue
- to another by consuming and re-publishing in the same
- transaction. That transaction is then completed in different ways
- and the appropriate result verified.
-
- The other tests enforce more specific rules and behaviour on a
- per-method or per-field basis.
- """
-
- XA_RBROLLBACK = 1
- XA_RBTIMEOUT = 2
- XA_OK = 8
-
- def test_simple_commit(self):
- """
- Test basic one-phase commit behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "commit")
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags)
-
- #check result
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(1, "queue-b")
- self.assertMessageId("commit", "queue-b")
-
- def test_simple_prepare_commit(self):
- """
- Test basic two-phase commit behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "prepare-commit")
-
- #prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags)
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags)
-
- #check result
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(1, "queue-b")
- self.assertMessageId("prepare-commit", "queue-b")
-
-
- def test_simple_rollback(self):
- """
- Test basic rollback behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "rollback")
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
-
- #check result
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("rollback", "queue-a")
-
- def test_simple_prepare_rollback(self):
- """
- Test basic rollback behaviour after the transaction has been prepared.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "prepare-rollback")
-
- #prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags)
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
-
- #check result
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("prepare-rollback", "queue-a")
-
- def test_select_required(self):
- """
- check that an error is flagged if select is not issued before
- start or end
- """
- channel = self.channel
- tx = self.xid("dummy")
- try:
- channel.dtx_demarcation_start(xid=tx)
-
- #if we get here we have failed, but need to do some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
- self.fail("Channel not selected for use with dtx, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_start_already_known(self):
- """
- Verify that an attempt to start an association with a
- transaction that is already known is not allowed (unless the
- join flag is set).
- """
- #create two channels on different connection & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
-
- other = self.connect()
- channel2 = other.channel(1)
- channel2.channel_open()
- channel2.dtx_demarcation_select()
-
- #create a xid
- tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
- #then start on the other without the join set
- failed = False
- try:
- channel2.dtx_demarcation_start(xid=tx)
- except Closed, e:
- failed = True
- error = e
-
- #cleanup:
- if not failed:
- channel2.dtx_demarcation_end(xid=tx)
- other.close()
- channel1.dtx_demarcation_end(xid=tx)
- channel1.dtx_coordination_rollback(xid=tx)
-
- #verification:
- if failed: self.assertConnectionException(503, e.args[0])
- else: self.fail("Xid already known, expected exception!")
-
- def test_forget_xid_on_completion(self):
- """
- Verify that a xid is 'forgotten' - and can therefore be used
- again - once it is completed.
- """
- channel = self.channel
- #do some transactional work & complete the transaction
- self.test_simple_commit()
-
- #start association for the same xid as the previously completed txn
- tx = self.xid("my-xid")
- channel.dtx_demarcation_start(xid=tx)
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
-
- def test_start_join_and_resume(self):
- """
- Ensure the correct error is signalled when both the join and
- resume flags are set on starting an association between a
- channel and a transcation.
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("dummy")
- try:
- channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
- #failed, but need some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
- self.fail("Join and resume both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_start_join(self):
- """
- Verify 'join' behaviour, where a channel is associated with a
- transaction that is already associated with another channel.
- """
- #create two channels & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
-
- channel2 = self.client.channel(2)
- channel2.channel_open()
- channel2.dtx_demarcation_select()
-
- #setup
- channel1.queue_declare(queue="one", exclusive=True)
- channel1.queue_declare(queue="two", exclusive=True)
- channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
- channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
-
- #create a xid
- tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
- #then start on the other with the join flag set
- channel2.dtx_demarcation_start(xid=tx, join=True)
-
- #do work through each channel
- self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
- self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
-
- #mark end on both channels
- channel1.dtx_demarcation_end(xid=tx)
- channel2.dtx_demarcation_end(xid=tx)
-
- #commit and check
- channel1.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "one")
- self.assertMessageCount(1, "two")
- self.assertMessageId("a", "two")
- self.assertMessageId("b", "one")
-
-
- def test_suspend_resume(self):
- """
- Test suspension and resumption of an association
- """
- channel = self.channel
- channel.dtx_demarcation_select()
-
- #setup
- channel.queue_declare(queue="one", exclusive=True)
- channel.queue_declare(queue="two", exclusive=True)
- channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
- channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
-
- tx = self.xid("dummy")
-
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
- channel.dtx_demarcation_end(xid=tx, suspend=True)
-
- channel.dtx_demarcation_start(xid=tx, resume=True)
- self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
- channel.dtx_demarcation_end(xid=tx)
-
- #commit and check
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "one")
- self.assertMessageCount(1, "two")
- self.assertMessageId("a", "two")
- self.assertMessageId("b", "one")
-
- def test_end_suspend_and_fail(self):
- """
- Verify that the correct error is signalled if the suspend and
- fail flag are both set when disassociating a transaction from
- the channel
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("suspend_and_fail")
- channel.dtx_demarcation_start(xid=tx)
- try:
- channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
- self.fail("Suspend and fail both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- #cleanup
- other = self.connect()
- channel = other.channel(1)
- channel.channel_open()
- channel.dtx_coordination_rollback(xid=tx)
- channel.channel_close()
- other.close()
-
-
- def test_end_unknown_xid(self):
- """
- Verifies that the correct exception is thrown when an attempt
- is made to end the association for a xid not previously
- associated with the channel
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("unknown-xid")
- try:
- channel.dtx_demarcation_end(xid=tx)
- self.fail("Attempted to end association with unknown xid, expected exception!")
- except Closed, e:
- #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
- self.assertConnectionException(503, e.args[0])
-
- def test_end(self):
- """
- Verify that the association is terminated by end and subsequent
- operations are non-transactional
- """
- channel = self.client.channel(2)
- channel.channel_open()
- channel.queue_declare(queue="tx-queue", exclusive=True)
-
- #publish a message under a transaction
- channel.dtx_demarcation_select()
- tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage")
- channel.dtx_demarcation_end(xid=tx)
-
- #now that association with txn is ended, publish another message
- channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage")
-
- #check the second message is available, but not the first
- self.assertMessageCount(1, "tx-queue")
- channel.message_consume(queue="tx-queue", destination="results", no_ack=False)
- msg = self.client.queue("results").get(timeout=1)
- self.assertEqual("two", msg.message_id)
- channel.message_cancel(destination="results")
- #ack the message then close the channel
- msg.ok()
- channel.channel_close()
-
- channel = self.channel
- #commit the transaction and check that the first message (and
- #only the first message) is then delivered
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "tx-queue")
- self.assertMessageId("one", "tx-queue")
-
- def test_invalid_commit_one_phase_true(self):
- """
- Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
- """
- other = self.connect()
- tester = other.channel(1)
- tester.channel_open()
- tester.queue_declare(queue="dummy", exclusive=True)
- tester.dtx_demarcation_select()
- tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(routing_key="dummy", body="whatever")
- tester.dtx_demarcation_end(xid=tx)
- tester.dtx_coordination_prepare(xid=tx)
- failed = False
- try:
- tester.dtx_coordination_commit(xid=tx, one_phase=True)
- except Closed, e:
- failed = True
- error = e
-
- if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
- else:
- tester.channel_close()
- other.close()
- self.fail("Invalid use of one_phase=True, expected exception!")
-
- def test_invalid_commit_one_phase_false(self):
- """
- Test that a commit with one_phase = False is rejected if the
- transaction in question has not yet been prepared.
- """
- """
- Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
- """
- other = self.connect()
- tester = other.channel(1)
- tester.channel_open()
- tester.queue_declare(queue="dummy", exclusive=True)
- tester.dtx_demarcation_select()
- tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(routing_key="dummy", body="whatever")
- tester.dtx_demarcation_end(xid=tx)
- failed = False
- try:
- tester.dtx_coordination_commit(xid=tx, one_phase=False)
- except Closed, e:
- failed = True
- error = e
-
- if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
- else:
- tester.channel_close()
- other.close()
- self.fail("Invalid use of one_phase=False, expected exception!")
-
- def test_implicit_end(self):
- """
- Test that an association is implicitly ended when the channel
- is closed (whether by exception or explicit client request)
- and the transaction in question is marked as rollback only.
- """
- channel1 = self.channel
- channel2 = self.client.channel(2)
- channel2.channel_open()
-
- #setup:
- channel2.queue_declare(queue="dummy", exclusive=True)
- channel2.message_transfer(routing_key="dummy", body="whatever")
- tx = self.xid("dummy")
-
- channel2.dtx_demarcation_select()
- channel2.dtx_demarcation_start(xid=tx)
- channel2.message_get(queue="dummy", destination="dummy")
- self.client.queue("dummy").get(timeout=1).ok()
- channel2.message_transfer(routing_key="dummy", body="whatever")
- channel2.channel_close()
-
- self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags)
- channel1.dtx_coordination_rollback(xid=tx)
-
- def test_get_timeout(self):
- """
- Check that get-timeout returns the correct value, (and that a
- transaction with a timeout can complete normally)
- """
- channel = self.channel
- tx = self.xid("dummy")
-
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
- self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
-
- def test_set_timeout(self):
- """
- Test the timeout of a transaction results in the expected
- behaviour
- """
- #open new channel to allow self.channel to be used in checking te queue
- channel = self.client.channel(2)
- channel.channel_open()
- #setup:
- tx = self.xid("dummy")
- channel.queue_declare(queue="queue-a", exclusive=True)
- channel.queue_declare(queue="queue-b", exclusive=True)
- channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage")
-
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "queue-a", "queue-b")
- channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
- sleep(3)
- #check that the work has been rolled back already
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("timeout", "queue-a")
- #check the correct codes are returned when we try to complete the txn
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags)
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags)
-
-
-
- def test_recover(self):
- """
- Test basic recover behaviour
- """
- channel = self.channel
-
- channel.dtx_demarcation_select()
- channel.queue_declare(queue="dummy", exclusive=True)
-
- prepared = []
- for i in range(1, 10):
- tx = self.xid("tx%s" % (i))
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(routing_key="dummy", body="message%s" % (i))
- channel.dtx_demarcation_end(xid=tx)
- if i in [2, 5, 6, 8]:
- channel.dtx_coordination_prepare(xid=tx)
- prepared.append(tx)
- else:
- channel.dtx_coordination_rollback(xid=tx)
-
- indoubt = channel.dtx_coordination_recover().xids
- #convert indoubt table to a list of xids (note: this will change for 0-10)
- data = indoubt["xids"]
- xids = []
- pos = 0
- while pos < len(data):
- size = unpack("!B", data[pos])[0]
- start = pos + 1
- end = start + size
- xid = data[start:end]
- xids.append(xid)
- pos = end
-
- #rollback the prepared transactions returned by recover
- for x in xids:
- channel.dtx_coordination_rollback(xid=x)
-
- #validate against the expected list of prepared transactions
- actual = set(xids)
- expected = set(prepared)
- intersection = actual.intersection(expected)
-
- if intersection != expected:
- missing = expected.difference(actual)
- extra = actual.difference(expected)
- for x in missing:
- channel.dtx_coordination_rollback(xid=x)
- self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
-
- def xid(self, txid, branchqual = ''):
- return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
-
- def txswap(self, tx, id):
- channel = self.channel
- #declare two queues:
- channel.queue_declare(queue="queue-a", exclusive=True)
- channel.queue_declare(queue="queue-b", exclusive=True)
- #put message with specified id on one queue:
- channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage")
-
- #start the transaction:
- channel.dtx_demarcation_select()
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags)
-
- #'swap' the message from one queue to the other, under that transaction:
- self.swap(self.channel, "queue-a", "queue-b")
-
- #mark the end of the transactional work:
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags)
-
- def swap(self, channel, src, dest):
- #consume from src:
- channel.message_get(destination="temp-swap", queue=src)
- msg = self.client.queue("temp-swap").get(timeout=1)
- msg.ok();
-
- #re-publish to dest
- channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)
-
- def assertMessageCount(self, expected, queue):
- self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count)
-
- def assertMessageId(self, expected, queue):
- self.channel.message_consume(queue=queue, destination="results", no_ack=True)
- self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id)
- self.channel.message_cancel(destination="results")
diff --git a/qpid/python/tests_0-9/example.py b/qpid/python/tests_0-9/example.py
deleted file mode 100644
index 7ab4cc7d0a..0000000000
--- a/qpid/python/tests_0-9/example.py
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class ExampleTest (TestBase):
- """
- An example Qpid test, illustrating the unittest frameowkr and the
- python Qpid client. The test class must inherit TestCase. The
- test code uses the Qpid client to interact with a qpid broker and
- verify it behaves as expected.
- """
-
- def test_example(self):
- """
- An example test. Note that test functions must start with 'test_'
- to be recognized by the test framework.
- """
-
- # By inheriting TestBase, self.client is automatically connected
- # and self.channel is automatically opened as channel(1)
- # Other channel methods mimic the protocol.
- channel = self.channel
-
- # Now we can send regular commands. If you want to see what the method
- # arguments mean or what other commands are available, you can use the
- # python builtin help() method. For example:
- #help(chan)
- #help(chan.exchange_declare)
-
- # If you want browse the available protocol methods without being
- # connected to a live server you can use the amqp-doc utility:
- #
- # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
- #
- # Options:
- # -e, --regexp use regex instead of glob when matching
-
- # Now that we know what commands are available we can use them to
- # interact with the server.
-
- # Here we use ordinal arguments.
- self.exchange_declare(channel, 0, "test", "direct")
-
- # Here we use keyword arguments.
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
-
- # Call Channel.basic_consume to register as a consumer.
- # All the protocol methods return a message object. The message object
- # has fields corresponding to the reply method fields, plus a content
- # field that is filled if the reply includes content. In this case the
- # interesting field is the consumer_tag.
- channel.message_consume(queue="test-queue", destination="consumer_tag")
-
- # We can use the Client.queue(...) method to access the queue
- # corresponding to our consumer_tag.
- queue = self.client.queue("consumer_tag")
-
- # Now lets publish a message and see if our consumer gets it. To do
- # this we need to import the Content class.
- body = "Hello World!"
- channel.message_transfer(destination="test",
- routing_key="key",
- body = body)
-
- # Now we'll wait for the message to arrive. We can use the timeout
- # argument in case the server hangs. By default queue.get() will wait
- # until a message arrives or the connection to the server dies.
- msg = queue.get(timeout=10)
-
- # And check that we got the right response with assertEqual
- self.assertEqual(body, msg.body)
-
- # Now acknowledge the message.
- msg.ok()
-
diff --git a/qpid/python/tests_0-9/exchange.py b/qpid/python/tests_0-9/exchange.py
deleted file mode 100644
index 3a47ffff8c..0000000000
--- a/qpid/python/tests_0-9/exchange.py
+++ /dev/null
@@ -1,327 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Tests for exchange behaviour.
-
-Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
-"""
-
-import Queue, logging
-from qpid.testlib import TestBase
-from qpid.content import Content
-from qpid.client import Closed
-
-
-class StandardExchangeVerifier:
- """Verifies standard exchange behavior.
-
- Used as base class for classes that test standard exchanges."""
-
- def verifyDirectExchange(self, ex):
- """Verify that ex behaves like a direct exchange."""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
- try:
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
- self.fail("Expected Empty exception")
- except Queue.Empty: None # Expected
-
- def verifyFanOutExchange(self, ex):
- """Verify that ex behaves like a fanout exchange."""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex)
- self.queue_declare(queue="p")
- self.channel.queue_bind(queue="p", exchange=ex)
- for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
-
- def verifyTopicExchange(self, ex):
- """Verify that ex behaves like a topic exchange"""
- self.queue_declare(queue="a")
- self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
- q = self.consume("a")
- self.assertPublishGet(q, ex, "a.b.x")
- self.assertPublishGet(q, ex, "a.x.b.x")
- self.assertPublishGet(q, ex, "a.x.x.b.x")
- # Shouldn't match
- self.channel.message_transfer(destination=ex, routing_key="a.b", body="")
- self.channel.message_transfer(destination=ex, routing_key="a.b.x.y", body="")
- self.channel.message_transfer(destination=ex, routing_key="x.a.b.x", body="")
- self.channel.message_transfer(destination=ex, routing_key="a.b", body="")
- self.assert_(q.empty())
-
- def verifyHeadersExchange(self, ex):
- """Verify that ex is a headers exchange"""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
- q = self.consume("q")
- headers = {"name":"fred", "age":3}
- self.assertPublishGet(q, exchange=ex, properties=headers)
- self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver
- self.assertEmpty(q);
-
-
-class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server SHOULD implement these standard exchange types: topic, headers.
-
- Client attempts to declare an exchange with each of these standard types.
- """
-
- def testDirect(self):
- """Declare and test a direct exchange"""
- self.exchange_declare(0, exchange="d", type="direct")
- self.verifyDirectExchange("d")
-
- def testFanout(self):
- """Declare and test a fanout exchange"""
- self.exchange_declare(0, exchange="f", type="fanout")
- self.verifyFanOutExchange("f")
-
- def testTopic(self):
- """Declare and test a topic exchange"""
- self.exchange_declare(0, exchange="t", type="topic")
- self.verifyTopicExchange("t")
-
- def testHeaders(self):
- """Declare and test a headers exchange"""
- self.exchange_declare(0, exchange="h", type="headers")
- self.verifyHeadersExchange("h")
-
-
-class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server MUST, in each virtual host, pre-declare an exchange instance
- for each standard exchange type that it implements, where the name of the
- exchange instance is amq. followed by the exchange type name.
-
- Client creates a temporary queue and attempts to bind to each required
- exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
- those types are defined).
- """
- def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
-
- def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
-
- def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
-
- def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
-
-class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server MUST predeclare a direct exchange to act as the default exchange
- for content Publish methods and for default queue bindings.
-
- Client checks that the default exchange is active by specifying a queue
- binding with no exchange name, and publishing a message with a suitable
- routing key but without specifying the exchange name, then ensuring that
- the message arrives in the queue correctly.
- """
- def testDefaultExchange(self):
- # Test automatic binding by queue name.
- self.queue_declare(queue="d")
- self.assertPublishConsume(queue="d", routing_key="d")
- # Test explicit bind to default queue
- self.verifyDirectExchange("")
-
-
-# TODO aconway 2006-09-27: Fill in empty tests:
-
-class DefaultAccessRuleTests(TestBase):
- """
- The server MUST NOT allow clients to access the default exchange except
- by specifying an empty exchange name in the Queue.Bind and content Publish
- methods.
- """
-
-class ExtensionsRuleTests(TestBase):
- """
- The server MAY implement other exchange types as wanted.
- """
-
-
-class DeclareMethodMinimumRuleTests(TestBase):
- """
- The server SHOULD support a minimum of 16 exchanges per virtual host and
- ideally, impose no limit except as defined by available resources.
-
- The client creates as many exchanges as it can until the server reports
- an error; the number of exchanges successfuly created must be at least
- sixteen.
- """
-
-
-class DeclareMethodTicketFieldValidityRuleTests(TestBase):
- """
- The client MUST provide a valid access ticket giving "active" access to
- the realm in which the exchange exists or will be created, or "passive"
- access if the if-exists flag is set.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
- """
- Exchange names starting with "amq." are reserved for predeclared and
- standardised exchanges. The client MUST NOT attempt to create an exchange
- starting with "amq.".
-
-
- """
-
-
-class DeclareMethodTypeFieldTypedRuleTests(TestBase):
- """
- Exchanges cannot be redeclared with different types. The client MUST not
- attempt to redeclare an existing exchange with a different type than used
- in the original Exchange.Declare method.
-
-
- """
-
-
-class DeclareMethodTypeFieldSupportRuleTests(TestBase):
- """
- The client MUST NOT attempt to create an exchange with a type that the
- server does not support.
-
-
- """
-
-
-class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
- """
- If set, and the exchange does not already exist, the server MUST raise a
- channel exception with reply code 404 (not found).
- """
- def test(self):
- try:
- self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
- self.fail("Expected 404 for passive declaration of unknown exchange.")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
-class DeclareMethodDurableFieldSupportRuleTests(TestBase):
- """
- The server MUST support both durable and transient exchanges.
-
-
- """
-
-
-class DeclareMethodDurableFieldStickyRuleTests(TestBase):
- """
- The server MUST ignore the durable field if the exchange already exists.
-
-
- """
-
-
-class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
- """
- The server MUST ignore the auto-delete field if the exchange already
- exists.
-
-
- """
-
-
-class DeleteMethodTicketFieldValidityRuleTests(TestBase):
- """
- The client MUST provide a valid access ticket giving "active" access
- rights to the exchange's access realm.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
- """
- The client MUST NOT attempt to delete an exchange that does not exist.
- """
-
-
-class HeadersExchangeTests(TestBase):
- """
- Tests for headers exchange functionality.
- """
- def setUp(self):
- TestBase.setUp(self)
- self.queue_declare(queue="q")
- self.q = self.consume("q")
-
- def myAssertPublishGet(self, headers):
- self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
-
- def myBasicPublish(self, headers):
- self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers)
-
- def testMatchAll(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
-
- # None of these should match
- self.myBasicPublish({})
- self.myBasicPublish({"name":"barney"})
- self.myBasicPublish({"name":10})
- self.myBasicPublish({"name":"fred", "age":2})
- self.assertEmpty(self.q)
-
- def testMatchAny(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred"})
- self.myAssertPublishGet({"name":"fred", "ignoreme":10})
- self.myAssertPublishGet({"ignoreme":10, "age":3})
-
- # Wont match
- self.myBasicPublish({})
- self.myBasicPublish({"irrelevant":0})
- self.assertEmpty(self.q)
-
-
-class MiscellaneousErrorsTests(TestBase):
- """
- Test some miscellaneous error conditions
- """
- def testTypeNotKnown(self):
- try:
- self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
- self.fail("Expected 503 for declaration of unknown exchange type.")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def testDifferentDeclaredType(self):
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
- try:
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
- self.fail("Expected 530 for redeclaration of exchange with different type.")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
- #cleanup
- other = self.connect()
- c2 = other.channel(1)
- c2.channel_open()
- c2.exchange_delete(exchange="test_different_declared_type_exchange")
-
diff --git a/qpid/python/tests_0-9/execution.py b/qpid/python/tests_0-9/execution.py
deleted file mode 100644
index f2facfe42b..0000000000
--- a/qpid/python/tests_0-9/execution.py
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class ExecutionTests (TestBase):
- def test_flush(self):
- channel = self.channel
- for i in [1, 2, 3]:
- channel.basic_publish()
- channel.execution_flush()
- assert(channel.completion.wait(channel.completion.command_id, timeout=1))
diff --git a/qpid/python/tests_0-9/message.py b/qpid/python/tests_0-9/message.py
deleted file mode 100644
index b25016e680..0000000000
--- a/qpid/python/tests_0-9/message.py
+++ /dev/null
@@ -1,657 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from qpid.reference import Reference, ReferenceId
-
-class MessageTests(TestBase):
- """Tests for 'methods' on the amqp message 'class'"""
-
- def test_consume_no_local(self):
- """
- Test that the no_local flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True)
- #establish two consumers one of which excludes delivery of locally sent messages
- channel.message_consume(destination="local_included", queue="test-queue-1a")
- channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True)
-
- #send a message
- channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local")
- channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local")
-
- #check the queues of the two consumers
- excluded = self.client.queue("local_excluded")
- included = self.client.queue("local_included")
- msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.body)
- try:
- excluded.get(timeout=1)
- self.fail("Received locally published message though no_local=true")
- except Empty: None
-
-
- def test_consume_exclusive(self):
- """
- Test that the exclusive flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- channel.message_consume(destination="first", queue="test-queue-2", exclusive=True)
- try:
- channel.message_consume(destination="second", queue="test-queue-2")
- self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new channel and cleanup last consumer:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #check that an exclusive consumer cannot be created if a consumer already exists:
- channel.message_consume(destination="first", queue="test-queue-2")
- try:
- channel.message_consume(destination="second", queue="test-queue-2", exclusive=True)
- self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- def test_consume_queue_errors(self):
- """
- Test error conditions associated with the queue field of the consume method:
- """
- channel = self.channel
- try:
- #queue specified but doesn't exist:
- channel.message_consume(queue="invalid-queue")
- self.fail("Expected failure when consuming from non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.message_consume(queue="")
- self.fail("Expected failure when consuming from unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_consume_unique_consumers(self):
- """
- Ensure unique consumer tags are enforced
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True)
-
- #check that attempts to use duplicate tags are detected and prevented:
- channel.message_consume(destination="first", queue="test-queue-3")
- try:
- channel.message_consume(destination="first", queue="test-queue-3")
- self.fail("Expected consume request to fail due to non-unique tag")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_cancel(self):
- """
- Test compliance of the basic.cancel method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.message_consume(destination="my-consumer", queue="test-queue-4")
- channel.message_transfer(routing_key="test-queue-4", body="One")
-
- #cancel should stop messages being delivered
- channel.message_cancel(destination="my-consumer")
- channel.message_transfer(routing_key="test-queue-4", body="Two")
- myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.body)
- try:
- msg = myqueue.get(timeout=1)
- self.fail("Got message after cancellation: " + msg)
- except Empty: None
-
- #cancellation of non-existant consumers should be handled without error
- channel.message_cancel(destination="my-consumer")
- channel.message_cancel(destination="this-never-existed")
-
-
- def test_ack(self):
- """
- Test basic ack/recover behaviour
- """
- channel = self.channel
- channel.queue_declare(queue="test-ack-queue", exclusive=True)
-
- channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False)
- queue = self.client.queue("consumer_tag")
-
- channel.message_transfer(routing_key="test-ack-queue", body="One")
- channel.message_transfer(routing_key="test-ack-queue", body="Two")
- channel.message_transfer(routing_key="test-ack-queue", body="Three")
- channel.message_transfer(routing_key="test-ack-queue", body="Four")
- channel.message_transfer(routing_key="test-ack-queue", body="Five")
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.body)
- self.assertEqual("Two", msg2.body)
- self.assertEqual("Three", msg3.body)
- self.assertEqual("Four", msg4.body)
- self.assertEqual("Five", msg5.body)
-
- msg1.ok(batchoffset=1)#One and Two
- msg4.ok()
-
- channel.message_recover(requeue=False)
-
- msg3b = queue.get(timeout=1)
- msg5b = queue.get(timeout=1)
-
- self.assertEqual("Three", msg3b.body)
- self.assertEqual("Five", msg5b.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- def test_recover_requeue(self):
- """
- Test requeing on recovery
- """
- channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True)
-
- channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False)
- queue = self.client.queue("consumer_tag")
-
- channel.message_transfer(routing_key="test-requeue", body="One")
- channel.message_transfer(routing_key="test-requeue", body="Two")
- channel.message_transfer(routing_key="test-requeue", body="Three")
- channel.message_transfer(routing_key="test-requeue", body="Four")
- channel.message_transfer(routing_key="test-requeue", body="Five")
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.body)
- self.assertEqual("Two", msg2.body)
- self.assertEqual("Three", msg3.body)
- self.assertEqual("Four", msg4.body)
- self.assertEqual("Five", msg5.body)
-
- msg1.ok(batchoffset=1) #One and Two
- msg4.ok() #Four
-
- channel.message_cancel(destination="consumer_tag")
-
- #publish a new message
- channel.message_transfer(routing_key="test-requeue", body="Six")
- #requeue unacked messages (Three and Five)
- channel.message_recover(requeue=True)
-
- channel.message_consume(queue="test-requeue", destination="consumer_tag")
- queue2 = self.client.queue("consumer_tag")
-
- msg3b = queue2.get(timeout=1)
- msg5b = queue2.get(timeout=1)
-
- self.assertEqual("Three", msg3b.body)
- self.assertEqual("Five", msg5b.body)
-
- self.assertEqual(True, msg3b.redelivered)
- self.assertEqual(True, msg5b.redelivered)
-
- self.assertEqual("Six", queue2.get(timeout=1).body)
-
- try:
- extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.body)
- except Empty: None
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.body)
- except Empty: None
-
-
- def test_qos_prefetch_count(self):
- """
- Test that the prefetch count specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False)
- queue = self.client.queue("consumer_tag")
-
- #set prefetch to 5:
- channel.message_qos(prefetch_count=5)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
-
- #only 5 messages should have been delivered:
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- #todo: once batching is implmented, send a single response for all messages
- msg.ok(batchoffset=-4)#1-5
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- msg.ok(batchoffset=-4)#6-10
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.body)
- except Empty: None
-
-
-
- def test_qos_prefetch_size(self):
- """
- Test that the prefetch size specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False)
- queue = self.client.queue("consumer_tag")
-
- #set prefetch to 50 bytes (each message is 9 or 10 bytes):
- channel.message_qos(prefetch_size=50)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
-
- #only 5 messages should have been delivered (i.e. 45 bytes worth):
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- msg.ok(batchoffset=-4)#1-5
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- msg.ok(batchoffset=-4)#6-10
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.body)
- except Empty: None
-
- #make sure that a single oversized message still gets delivered
- large = "abcdefghijklmnopqrstuvwxyz"
- large = large + "-" + large;
- channel.message_transfer(routing_key="test-prefetch-size", body=large)
- msg = queue.get(timeout=1)
- self.assertEqual(large, msg.body)
-
- def test_get(self):
- """
- Test message_get method
- """
- channel = self.channel
- channel.queue_declare(queue="test-get", exclusive=True)
-
- #publish some messages (no_ack=True)
- for i in range(1, 11):
- channel.message_transfer(routing_key="test-get", body="Message %d" % i)
-
- #use message_get to read back the messages, and check that we get an empty at the end
- for i in range(1, 11):
- tag = "queue %d" % i
- reply = channel.message_get(no_ack=True, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- #repeat for no_ack=False
- for i in range(11, 21):
- channel.message_transfer(routing_key="test-get", body="Message %d" % i)
-
- for i in range(11, 21):
- tag = "queue %d" % i
- reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- msg = self.client.queue(tag).get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- if (i==13):
- msg.ok(batchoffset=-2)#11, 12 & 13
- if(i in [15, 17, 19]):
- msg.ok()
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- #recover(requeue=True)
- channel.message_recover(requeue=True)
-
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
- tag = "queue %d" % i
- reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- msg = self.client.queue(tag).get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
- #channel.message_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- channel.message_recover(requeue=True)
-
- reply = channel.message_get(no_ack=True, queue="test-get")
- self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "empty")
-
- def test_reference_simple(self):
- """
- Test basic ability to handle references
- """
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.message_append(reference=refId, bytes="abcd")
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
- channel.synchronous = True
-
- channel.message_append(reference=refId, bytes="efgh")
- channel.message_append(reference=refId, bytes="ijkl")
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl")
-
-
- def test_reference_large(self):
- """
- Test basic ability to handle references whose content exceeds max frame size
- """
- channel = self.channel
- self.queue_declare(queue="ref_queue")
-
- #generate a big data string (> max frame size of consumer):
- data = "0123456789"
- for i in range(0, 10):
- data += data
- #send it inline
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=data)
- channel.synchronous = True
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- #create a new connection for consumer, with specific max frame size (< data)
- other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
- ch2 = other.channel(1)
- ch2.channel_open()
- ch2.message_consume(queue="ref_queue", destination="c1")
- queue = other.queue("c1")
-
- msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg.body, ReferenceId))
- self.assertTrue(msg.reference)
- self.assertEquals(data, msg.reference.get_complete())
-
- def test_reference_completion(self):
- """
- Test that reference transfer are not deemed complete until
- closed (therefore are not acked or routed until that point)
- """
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.message_append(reference=refId, bytes="abcd")
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
- channel.synchronous = True
-
- try:
- msg = queue.get(timeout=1)
- self.fail("Got unexpected message on queue: " + msg)
- except Empty: None
-
- self.assertTrue(not ack.is_complete())
-
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
-
- def test_reference_multi_transfer(self):
- """
- Test that multiple transfer requests for the same reference are
- correctly handled.
- """
- channel = self.channel
- #declare and consume from two queues
- channel.queue_declare(queue="q-one", exclusive=True)
- channel.queue_declare(queue="q-two", exclusive=True)
- channel.message_consume(queue="q-one", destination="q-one")
- channel.message_consume(queue="q-two", destination="q-two")
- queue1 = self.client.queue("q-one")
- queue2 = self.client.queue("q-two")
-
- #transfer a single ref to both queues (in separate commands)
- channel.message_open(reference="my-ref")
- channel.synchronous = False
- ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
- channel.message_append(reference="my-ref", bytes="my data")
- ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- #check that both queues have the message
- self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
- self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
- self.assertEmpty(queue1)
- self.assertEmpty(queue2)
-
- #transfer a single ref to the same queue twice (in separate commands)
- channel.message_open(reference="my-ref")
- channel.synchronous = False
- ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
- channel.message_append(reference="my-ref", bytes="second message")
- ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- msg1 = queue1.get(timeout=1)
- msg2 = queue1.get(timeout=1)
- #order is undefined
- if msg1.message_id == "abc":
- self.assertEquals(msg2.message_id, "xyz")
- else:
- self.assertEquals(msg1.message_id, "xyz")
- self.assertEquals(msg2.message_id, "abc")
-
- #would be legal for the incoming messages to be transfered
- #inline or by reference in any combination
-
- if isinstance(msg1.body, ReferenceId):
- self.assertEquals("second message", msg1.reference.get_complete())
- if isinstance(msg2.body, ReferenceId):
- if msg1.body != msg2.body:
- self.assertEquals("second message", msg2.reference.get_complete())
- #else ok, as same ref as msg1
- else:
- self.assertEquals("second message", msg1.body)
- if isinstance(msg2.body, ReferenceId):
- self.assertEquals("second message", msg2.reference.get_complete())
- else:
- self.assertEquals("second message", msg2.body)
-
- self.assertEmpty(queue1)
-
- def test_reference_unopened_on_append_error(self):
- channel = self.channel
- try:
- channel.message_append(reference="unopened")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_unopened_on_close_error(self):
- channel = self.channel
- try:
- channel.message_close(reference="unopened")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_unopened_on_transfer_error(self):
- channel = self.channel
- try:
- channel.message_transfer(body=ReferenceId("unopened"))
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_reference_already_opened_error(self):
- channel = self.channel
- channel.message_open(reference="a")
- try:
- channel.message_open(reference="a")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_empty_reference(self):
- channel = self.channel
- channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
- queue = self.client.queue("c1")
-
- refId = "myref"
- channel.message_open(reference=refId)
- channel.synchronous = False
- ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
- channel.synchronous = True
- channel.message_close(reference=refId)
-
- #first, wait for the ok for the transfer
- ack.get_response(timeout=1)
-
- msg = queue.get(timeout=1)
- self.assertEquals(msg.message_id, "empty-msg")
- self.assertDataEquals(channel, msg, "")
-
- def test_reject(self):
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
-
- channel.message_consume(queue = "q", destination = "consumer")
- channel.message_transfer(routing_key = "q", body="blah, blah")
- msg = self.client.queue("consumer").get(timeout = 1)
- self.assertEquals(msg.body, "blah, blah")
- channel.message_cancel(destination = "consumer")
- msg.reject()
-
- channel.message_consume(queue = "q", destination = "checker")
- msg = self.client.queue("checker").get(timeout = 1)
- self.assertEquals(msg.body, "blah, blah")
-
- def test_checkpoint(self):
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
-
- channel.message_open(reference="my-ref")
- channel.message_append(reference="my-ref", bytes="abcdefgh")
- channel.message_append(reference="my-ref", bytes="ijklmnop")
- channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
- channel.channel_close()
-
- channel = self.client.channel(2)
- channel.channel_open()
- channel.message_consume(queue = "q", destination = "consumer")
- offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
- self.assertTrue(offset<=16)
- channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
- channel.synchronous = False
- channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
- channel.synchronous = True
- channel.message_close(reference="my-ref")
-
- self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
- self.assertEmpty(self.client.queue("consumer"))
-
-
- def assertDataEquals(self, channel, msg, expected):
- if isinstance(msg.body, ReferenceId):
- data = msg.reference.get_complete()
- else:
- data = msg.body
- self.assertEquals(expected, data)
diff --git a/qpid/python/tests_0-9/query.py b/qpid/python/tests_0-9/query.py
index c2e08c003c..cb66d079e5 100644
--- a/qpid/python/tests_0-9/query.py
+++ b/qpid/python/tests_0-9/query.py
@@ -19,7 +19,7 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase
class QueryTests(TestBase):
"""Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
diff --git a/qpid/python/tests_0-9/queue.py b/qpid/python/tests_0-9/queue.py
index e7fe0b3ed4..de1153307c 100644
--- a/qpid/python/tests_0-9/queue.py
+++ b/qpid/python/tests_0-9/queue.py
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,137 +19,11 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase
class QueueTests(TestBase):
"""Tests for 'methods' on the amqp queue 'class'"""
- def test_purge(self):
- """
- Test that the purge method removes messages from the queue
- """
- channel = self.channel
- #setup, declare a queue and add some messages to it:
- channel.exchange_declare(exchange="test-exchange", type="direct")
- channel.queue_declare(queue="test-queue", exclusive=True)
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="one")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="two")
- channel.message_transfer(destination="test-exchange", routing_key="key", body="three")
-
- #check that the queue now reports 3 messages:
- reply = channel.queue_declare(queue="test-queue")
- self.assertEqual(3, reply.message_count)
-
- #now do the purge, then test that three messages are purged and the count drops to 0
- reply = channel.queue_purge(queue="test-queue");
- self.assertEqual(3, reply.message_count)
- reply = channel.queue_declare(queue="test-queue")
- self.assertEqual(0, reply.message_count)
-
- #send a further message and consume it, ensuring that the other messages are really gone
- channel.message_transfer(destination="test-exchange", routing_key="key", body="four")
- channel.message_consume(queue="test-queue", destination="tag", no_ack=True)
- queue = self.client.queue("tag")
- msg = queue.get(timeout=1)
- self.assertEqual("four", msg.body)
-
- #check error conditions (use new channels):
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue specified but doesn't exist:
- channel.queue_purge(queue="invalid-queue")
- self.fail("Expected failure when purging non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(3)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.queue_purge()
- self.fail("Expected failure when purging unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- #cleanup
- other = self.connect()
- channel = other.channel(1)
- channel.channel_open()
- channel.exchange_delete(exchange="test-exchange")
-
- def test_declare_exclusive(self):
- """
- Test that the exclusive field is honoured in queue.declare
- """
- # TestBase.setUp has already opened channel(1)
- c1 = self.channel
- # Here we open a second separate connection:
- other = self.connect()
- c2 = other.channel(1)
- c2.channel_open()
-
- #declare an exclusive queue:
- c1.queue_declare(queue="exclusive-queue", exclusive="True")
- try:
- #other connection should not be allowed to declare this:
- c2.queue_declare(queue="exclusive-queue", exclusive="True")
- self.fail("Expected second exclusive queue_declare to raise a channel exception")
- except Closed, e:
- self.assertChannelException(405, e.args[0])
-
-
- def test_declare_passive(self):
- """
- Test that the passive field is honoured in queue.declare
- """
- channel = self.channel
- #declare an exclusive queue:
- channel.queue_declare(queue="passive-queue-1", exclusive="True")
- channel.queue_declare(queue="passive-queue-1", passive="True")
- try:
- #other connection should not be allowed to declare this:
- channel.queue_declare(queue="passive-queue-2", passive="True")
- self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
- def test_bind(self):
- """
- Test various permutations of the queue.bind method
- """
- channel = self.channel
- channel.queue_declare(queue="queue-1", exclusive="True")
-
- #straightforward case, both exchange & queue exist so no errors expected:
- channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
-
- #bind the default queue for the channel (i.e. last one declared):
- channel.queue_bind(exchange="amq.direct", routing_key="key2")
-
- #use the queue name where neither routing key nor queue are specified:
- channel.queue_bind(exchange="amq.direct")
-
- #try and bind to non-existant exchange
- try:
- channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
- self.fail("Expected bind to non-existant exchange to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- #need to reopen a channel:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #try and bind non-existant queue:
- try:
- channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
- self.fail("Expected bind of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
def test_unbind_direct(self):
self.unbind_test(exchange="amq.direct", routing_key="key")
@@ -165,12 +39,12 @@ class QueueTests(TestBase):
def unbind_test(self, exchange, routing_key="", args=None, headers={}):
#bind two queues and consume from them
channel = self.channel
-
+
channel.queue_declare(queue="queue-1", exclusive="True")
channel.queue_declare(queue="queue-2", exclusive="True")
- channel.message_consume(queue="queue-1", destination="queue-1", no_ack=True)
- channel.message_consume(queue="queue-2", destination="queue-2", no_ack=True)
+ channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True)
+ channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True)
queue1 = self.client.queue("queue-1")
queue2 = self.client.queue("queue-2")
@@ -179,130 +53,29 @@ class QueueTests(TestBase):
channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
#send a message that will match both bindings
- channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one")
-
+ channel.basic_publish(exchange=exchange, routing_key=routing_key,
+ content=Content("one", properties={"headers": headers}))
+
#unbind first queue
channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
-
+
#send another message
- channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two")
+ channel.basic_publish(exchange=exchange, routing_key=routing_key,
+ content=Content("two", properties={"headers": headers}))
#check one queue has both messages and the other has only one
- self.assertEquals("one", queue1.get(timeout=1).body)
+ self.assertEquals("one", queue1.get(timeout=1).content.body)
try:
msg = queue1.get(timeout=1)
self.fail("Got extra message: %s" % msg.body)
except Empty: pass
- self.assertEquals("one", queue2.get(timeout=1).body)
- self.assertEquals("two", queue2.get(timeout=1).body)
+ self.assertEquals("one", queue2.get(timeout=1).content.body)
+ self.assertEquals("two", queue2.get(timeout=1).content.body)
try:
msg = queue2.get(timeout=1)
self.fail("Got extra message: " + msg)
- except Empty: pass
-
-
- def test_delete_simple(self):
- """
- Test core queue deletion behaviour
- """
- channel = self.channel
-
- #straight-forward case:
- channel.queue_declare(queue="delete-me")
- channel.message_transfer(routing_key="delete-me", body="a")
- channel.message_transfer(routing_key="delete-me", body="b")
- channel.message_transfer(routing_key="delete-me", body="c")
- reply = channel.queue_delete(queue="delete-me")
- self.assertEqual(3, reply.message_count)
- #check that it has gone be declaring passively
- try:
- channel.queue_declare(queue="delete-me", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- #check attempted deletion of non-existant queue is handled correctly:
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- channel.queue_delete(queue="i-dont-exist", if_empty="True")
- self.fail("Expected delete of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
-
- def test_delete_ifempty(self):
- """
- Test that if_empty field of queue_delete is honoured
- """
- channel = self.channel
-
- #create a queue and add a message to it (use default binding):
- channel.queue_declare(queue="delete-me-2")
- channel.queue_declare(queue="delete-me-2", passive="True")
- channel.message_transfer(routing_key="delete-me-2", body="message")
-
- #try to delete, but only if empty:
- try:
- channel.queue_delete(queue="delete-me-2", if_empty="True")
- self.fail("Expected delete if_empty to fail for non-empty queue")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
- #need new channel now:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #empty queue:
- channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True)
- queue = self.client.queue("consumer_tag")
- msg = queue.get(timeout=1)
- self.assertEqual("message", msg.body)
- channel.message_cancel(destination="consumer_tag")
-
- #retry deletion on empty queue:
- channel.queue_delete(queue="delete-me-2", if_empty="True")
-
- #check that it has gone by declaring passively:
- try:
- channel.queue_declare(queue="delete-me-2", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- def test_delete_ifunused(self):
- """
- Test that if_unused field of queue_delete is honoured
- """
- channel = self.channel
-
- #create a queue and register a consumer:
- channel.queue_declare(queue="delete-me-3")
- channel.queue_declare(queue="delete-me-3", passive="True")
- channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True)
-
- #need new channel now:
- channel2 = self.client.channel(2)
- channel2.channel_open()
- #try to delete, but only if empty:
- try:
- channel2.queue_delete(queue="delete-me-3", if_unused="True")
- self.fail("Expected delete if_unused to fail for queue with existing consumer")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
-
- channel.message_cancel(destination="consumer_tag")
- channel.queue_delete(queue="delete-me-3", if_unused="True")
- #check that it has gone by declaring passively:
- try:
- channel.queue_declare(queue="delete-me-3", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
+ except Empty: pass
def test_autodelete_shared(self):
"""
@@ -336,5 +109,3 @@ class QueueTests(TestBase):
self.fail("Expected queue to have been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
-
-
diff --git a/qpid/python/tests_0-9/testlib.py b/qpid/python/tests_0-9/testlib.py
deleted file mode 100644
index f345fbbd80..0000000000
--- a/qpid/python/tests_0-9/testlib.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-#
-# Tests for the testlib itself.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from Queue import Empty
-
-import sys
-from traceback import *
-
-def mytrace(frame, event, arg):
- print_stack(frame);
- print "===="
- return mytrace
-
-class TestBaseTest(TestBase):
- """Verify TestBase functions work as expected"""
-
- def testAssertEmptyPass(self):
- """Test assert empty works"""
- self.queue_declare(queue="empty")
- q = self.consume("empty")
- self.assertEmpty(q)
- try:
- q.get(timeout=1)
- self.fail("Queue is not empty.")
- except Empty: None # Ignore
-
- def testAssertEmptyFail(self):
- self.queue_declare(queue="full")
- q = self.consume("full")
- self.channel.message_transfer(routing_key="full", body="")
- try:
- self.assertEmpty(q);
- self.fail("assertEmpty did not assert on non-empty queue")
- except AssertionError: None # Ignore
-
- def testMessageProperties(self):
- """Verify properties are passed with message"""
- props={"x":1, "y":2}
- self.queue_declare(queue="q")
- q = self.consume("q")
- self.assertPublishGet(q, routing_key="q", properties=props)
-
-
-
diff --git a/qpid/python/tests_0-9/tx.py b/qpid/python/tests_0-9/tx.py
deleted file mode 100644
index 0f6b4f5cd1..0000000000
--- a/qpid/python/tests_0-9/tx.py
+++ /dev/null
@@ -1,188 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class TxTests(TestBase):
- """
- Tests for 'methods' on the amqp tx 'class'
- """
-
- def test_commit(self):
- """
- Test that commited publishes are delivered and commited acks are not re-delivered
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel.tx_commit()
-
- #check results
- for i in range(1, 5):
- msg = queue_c.get(timeout=1)
- self.assertEqual("TxMessage %d" % i, msg.body)
- msg.ok()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("TxMessage 6", msg.body)
- msg.ok()
-
- msg = queue_a.get(timeout=1)
- self.assertEqual("TxMessage 7", msg.body)
- msg.ok()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def test_auto_rollback(self):
- """
- Test that a channel closed with an open transaction is effectively rolled back
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- channel.tx_rollback()
-
- #check results
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
- msg.ok()
-
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
- msg.ok()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def test_rollback(self):
- """
- Test that rolled back publishes are not delivered and rolled back acks are re-delivered
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- channel.tx_rollback()
-
- #check results
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
- msg.ok()
-
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
- msg.ok()
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.body)
- except Empty: None
-
- #cleanup
- channel.tx_commit()
-
- def perform_txn_work(self, channel, name_a, name_b, name_c):
- """
- Utility method that does some setup and some work under a transaction. Used for testing both
- commit and rollback
- """
- #setup:
- channel.queue_declare(queue=name_a, exclusive=True)
- channel.queue_declare(queue=name_b, exclusive=True)
- channel.queue_declare(queue=name_c, exclusive=True)
-
- key = "my_key_" + name_b
- topic = "my_topic_" + name_c
-
- channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
- channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
-
- for i in range(1, 5):
- channel.message_transfer(routing_key=name_a, body="Message %d" % i)
-
- channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6")
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7")
-
- channel.tx_select()
-
- #consume and ack messages
- channel.message_consume(queue=name_a, destination="sub_a", no_ack=False)
- queue_a = self.client.queue("sub_a")
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.body)
-
- msg.ok(batchoffset=-3)
-
- channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
- queue_b = self.client.queue("sub_b")
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.body)
- msg.ok()
-
- sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False)
- queue_c = self.client.queue("sub_c")
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.body)
- msg.ok()
-
- #publish messages
- for i in range(1, 5):
- channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i)
-
- channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6")
- channel.message_transfer(routing_key=name_a, body="TxMessage 7")
-
- return queue_a, queue_b, queue_c