summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-19 20:34:32 +0000
committerAlan Conway <aconway@apache.org>2007-03-19 20:34:32 +0000
commit9975a3e9875d6be093a010071e9e7f3bbd9045f4 (patch)
tree7ec6be0b7a450f1389776dfb17ebe79e9add8121
parente99f2ccaab3d3f2ec4733b57c51bce52df8fc790 (diff)
downloadqpid-python-9975a3e9875d6be093a010071e9e7f3bbd9045f4.tar.gz
Merged revisions 507491-507559,507561-507601,507603-507621,507623-507671,507673-507959,507961-507992,507994-508097,508099-508149,508151-508155,508157-508232,508234-508378,508380-508390,508392-508459,508461-508704,508707-509615,509617-509737,509739-509753,509756-509833,509835-510106,510108-510160,510162-510179,510181-510552,510554-510704,510706-510911,510913-510985,510987-511003,511005-514750,514752-515720,515722-516156,516158-516458,516461-516484,516486-516488,516490-517823,517825,517827,517829,517831-517832,517834-517848,517850,517852-517854,517856-517858,517860-517877,517879-517886,517888-517891,517893-517903,517905,517907-517928,517930,517932-518197,518201-518206,518208-518230,518232,518235,518237,518239-518240,518243-518245,518247-518255,518257,518259-518260,518262,518264,518266-518292,518294-518707 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9 ........ r507491 | gsim | 2007-02-14 06:39:26 -0500 (Wed, 14 Feb 2007) | 3 lines Expanded the use of batched acks to a few other places in tests. ........ r508377 | gsim | 2007-02-16 07:03:37 -0500 (Fri, 16 Feb 2007) | 4 lines Updated failing list for java (some of these result in test suite blocking) Added better error handling when connection closes without close method ........ r508396 | gsim | 2007-02-16 08:54:54 -0500 (Fri, 16 Feb 2007) | 3 lines Fix: use message_resume not channel_resume ........ r509611 | gsim | 2007-02-20 10:39:14 -0500 (Tue, 20 Feb 2007) | 3 lines Fixed bug where response id rather than request id was being used to get listener for response. ........ r509617 | gsim | 2007-02-20 10:52:31 -0500 (Tue, 20 Feb 2007) | 3 lines Updated list of failing tests for java broker on this branch. ........ r510096 | gsim | 2007-02-21 11:49:27 -0500 (Wed, 21 Feb 2007) | 5 lines Fixed bug in references where map wasn't qualified in close Attach reference to transfer, as it will be deleted on close Altered tests to get reference from the message on the queue rather than looking them up from channel as they are already gone there ........ r510114 | astitcher | 2007-02-21 12:37:36 -0500 (Wed, 21 Feb 2007) | 3 lines r1224@fuschia: andrew | 2007-02-21 17:20:59 +0000 Updated expected cpp broker test failures ........ r510128 | gsim | 2007-02-21 13:06:02 -0500 (Wed, 21 Feb 2007) | 3 lines Ensure socket is closed in tearDown ........ r510913 | gsim | 2007-02-23 06:37:08 -0500 (Fri, 23 Feb 2007) | 3 lines Revised list of failing tests for java broker on this branch ........ r515363 | aconway | 2007-03-06 18:35:08 -0500 (Tue, 06 Mar 2007) | 6 lines * python/qpid/peer.py (Channel.__init__): use reliable framing if version >= 0-8. * python/qpid/spec.py (Spec.__init__): Remove unused parameter. * python/qpid/testlib.py (TestRunner._parseargs): Add --errata option, set default errata only if --spec is not present. ........ r518707 | aconway | 2007-03-15 13:49:44 -0400 (Thu, 15 Mar 2007) | 6 lines * python/qpid/peer.py (Peer.close): Close delegate *before* channels. Otherwise we get a race: closing a channel can wake a client thread, which may see client.closed as still false. Was causing bogus exceptions in some tests. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@520094 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/cpp_failing.txt4
-rw-r--r--qpid/python/java_failing.txt12
-rw-r--r--qpid/python/qpid/client.py11
-rw-r--r--qpid/python/qpid/connection.py3
-rw-r--r--qpid/python/qpid/peer.py11
-rw-r--r--qpid/python/qpid/reference.py2
-rw-r--r--qpid/python/qpid/spec.py5
-rw-r--r--qpid/python/qpid/testlib.py53
-rw-r--r--qpid/python/tests/message.py214
-rw-r--r--qpid/python/tests/tx.py3
10 files changed, 243 insertions, 75 deletions
diff --git a/qpid/python/cpp_failing.txt b/qpid/python/cpp_failing.txt
index e69de29bb2..69c7cf3f5a 100644
--- a/qpid/python/cpp_failing.txt
+++ b/qpid/python/cpp_failing.txt
@@ -0,0 +1,4 @@
+tests.message.MessageTests.test_checkpoint
+tests.message.MessageTests.test_reference_large
+tests.message.MessageTests.test_reject
+tests.basic.BasicTests.test_get
diff --git a/qpid/python/java_failing.txt b/qpid/python/java_failing.txt
index b671f86064..7252d0f496 100644
--- a/qpid/python/java_failing.txt
+++ b/qpid/python/java_failing.txt
@@ -1,4 +1,4 @@
-tests.basic.BasicTests.test_qos_prefetch_count
+ntests.basic.BasicTests.test_qos_prefetch_count
tests.basic.BasicTests.test_ack
tests.basic.BasicTests.test_cancel
tests.basic.BasicTests.test_consume_exclusive
@@ -8,11 +8,11 @@ tests.basic.BasicTests.test_consume_unique_consumers
tests.basic.BasicTests.test_get
tests.basic.BasicTests.test_qos_prefetch_size
tests.basic.BasicTests.test_recover_requeue
+
tests.exchange.RecommendedTypesRuleTests.testTopic
tests.exchange.RequiredInstancesRuleTests.testAmqTopic
-tests.message.MessageTests.test_qos_prefetch_count
-tests.message.MessageTests.test_ack
-tests.message.MessageTests.test_get
-tests.message.MessageTests.test_qos_prefetch_size
-tests.message.MessageTests.test_recover_requeue
+tests.message.MessageTests.test_checkpoint
+tests.message.MessageTests.test_reject
+
+tests.broker.BrokerTests.test_ping_pong
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index ea6aa7901a..e548ef0e99 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -76,7 +76,8 @@ class Client:
self.locale = locale
self.tune_params = tune_params
- self.conn = Connection(connect(self.host, self.port), self.spec)
+ self.socket = connect(self.host, self.port)
+ self.conn = Connection(self.socket, self.spec)
self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
self.conn.init()
@@ -90,6 +91,9 @@ class Client:
def opened(self, ch):
ch.references = References()
+ def close(self):
+ self.socket.close()
+
class ClientDelegate(Delegate):
def __init__(self, client):
@@ -112,9 +116,8 @@ class ClientDelegate(Delegate):
def message_transfer(self, ch, msg):
if isinstance(msg.body, ReferenceId):
- self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
- else:
- self.client.queue(msg.destination).put(msg)
+ msg.reference = ch.references.get(msg.body.id)
+ self.client.queue(msg.destination).put(msg)
def message_open(self, ch, msg):
ch.references.open(msg.reference)
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py
index 0785fe8774..cdfa2c2dc0 100644
--- a/qpid/python/qpid/connection.py
+++ b/qpid/python/qpid/connection.py
@@ -53,6 +53,9 @@ class SockIO:
def flush(self):
pass
+ def close(self):
+ self.sock.shutdown(socket.SHUT_RDWR)
+
def connect(host, port):
sock = socket.socket()
sock.connect((host, port))
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 6c8c6647c9..28db20f187 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -97,9 +97,12 @@ class Peer:
self.fatal()
def close(self, reason):
+ # We must close the delegate first because closing channels
+ # may wake up waiting threads and we don't want them to see
+ # the delegate as open.
+ self.delegate.close(reason)
for ch in self.channels.values():
ch.close(reason)
- self.delegate.close(reason)
def writer(self):
try:
@@ -144,7 +147,7 @@ class Requester:
self.write(frame, content)
def receive(self, channel, frame):
- listener = self.outstanding.pop(frame.id)
+ listener = self.outstanding.pop(frame.request_id)
listener(channel, frame)
class Responder:
@@ -178,8 +181,8 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
- # XXX: better switch
- self.reliable = False
+ # Use reliable framing if version == 0-9.
+ self.reliable = (spec.major == 0 and spec.minor == 9)
self.synchronous = True
def close(self, reason):
diff --git a/qpid/python/qpid/reference.py b/qpid/python/qpid/reference.py
index d357560390..48ecb67656 100644
--- a/qpid/python/qpid/reference.py
+++ b/qpid/python/qpid/reference.py
@@ -111,7 +111,7 @@ class References:
self.get(id).close()
self.lock.acquire()
try:
- del map[id]
+ self.map.pop(id)
finally:
self.lock.release()
diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py
index e430c45b96..4f0661bcbc 100644
--- a/qpid/python/qpid/spec.py
+++ b/qpid/python/qpid/spec.py
@@ -79,12 +79,11 @@ class Spec(Metadata):
PRINT=["major", "minor", "file"]
- def __init__(self, major, minor, file, errata):
+ def __init__(self, major, minor, file):
Metadata.__init__(self)
self.major = major
self.minor = minor
self.file = file
- self.errata = errata
self.constants = SpecContainer()
self.classes = SpecContainer()
# methods indexed by classname_methname
@@ -275,7 +274,7 @@ def load_fields(nd, l, domains):
def load(specfile, *errata):
doc = xmlutil.parse(specfile)
spec_root = doc["amqp"][0]
- spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile, errata)
+ spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
for root in [spec_root] + map(lambda x: xmlutil.parse(x)["amqp"][0], errata):
# constants
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index dcbf0ed91c..05641bce7e 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -26,6 +26,7 @@ import qpid.client, qpid.spec
import Queue
from getopt import getopt, GetoptError
from qpid.content import Content
+from qpid.message import Message
def findmodules(root):
"""Find potential python modules under directory root"""
@@ -56,15 +57,16 @@ class TestRunner:
run-tests [options] [test*]
The name of a test is package.module.ClassName.testMethod
Options:
- -?/-h/--help : this message
+ -?/-h/--help : this message
-s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
0-8 - use the default 0-8 specification.
0-9 - use the default 0-9 specification.
+ -e/--errata <errata.xml> : file containing amqp XML errata
-b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
+ -v/--verbose : verbose - lists tests as they are run.
+ -d/--debug : enable debug logging.
+ -i/--ignore <test> : ignore the named test.
+ -I/--ignore-file : file containing patterns to ignore.
"""
sys.exit(1)
@@ -103,24 +105,27 @@ Options:
for opt, value in opts:
if opt in ("-?", "-h", "--help"): self._die()
if opt in ("-s", "--spec"): self.specfile = value
+ if opt in ("-e", "--errata"): self.errata.append(value)
if opt in ("-b", "--broker"): self.setBroker(value)
if opt in ("-v", "--verbose"): self.verbose = 2
if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
if opt in ("-i", "--ignore"): self.ignore.append(value)
if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
+ # Abbreviations for default settings.
if (self.specfile == "0-8"):
- self.specfile = "../specs/amqp.0-8.xml"
+ self.specfile = "../specs/amqp.0-8.xml"
if (self.specfile == "0-9"):
- self.specfile = "../specs/amqp.0-9.xml"
- self.errata = ["../specs/amqp-errata.0-9.xml"]
+ self.specfile = "../specs/amqp.0-9.xml"
+ self.errata.append("../specs/amqp-errata.0-9.xml")
+ if (self.specfile == None):
+ self._die("No XML specification provided")
print "Using specification from:", self.specfile
self.spec = qpid.spec.load(self.specfile, *self.errata)
if len(self.tests) == 0:
if self.use08spec():
- testdir="tests_0-8"
+ self.tests=findmodules("tests_0-8")
else:
- testdir="tests"
- self.tests=findmodules(testdir)
+ self.tests=findmodules("tests")
def testSuite(self):
class IgnoringTestSuite(unittest.TestSuite):
@@ -137,7 +142,11 @@ Options:
self._parseargs(args)
runner = unittest.TextTestRunner(descriptions=False,
verbosity=self.verbose)
- result = runner.run(self.testSuite())
+ try:
+ result = runner.run(self.testSuite())
+ except:
+ print "Unhandled error in test:", sys.exc_info()
+
if (self.ignore):
print "======================================="
print "NOTE: the following tests were ignored:"
@@ -181,10 +190,18 @@ class TestBase(unittest.TestCase):
self.channel.channel_open()
def tearDown(self):
- for ch, q in self.queues:
- ch.queue_delete(queue=q)
- for ch, ex in self.exchanges:
- ch.exchange_delete(exchange=ex)
+ try:
+ for ch, q in self.queues:
+ ch.queue_delete(queue=q)
+ for ch, ex in self.exchanges:
+ ch.exchange_delete(exchange=ex)
+ except:
+ print "Error on tearDown:", sys.exc_info()
+
+ if not self.client.closed:
+ self.client.channel(0).connection_close(reply_code=200)
+ else:
+ self.client.close()
def connect(self, *args, **keys):
"""Create a new connction, return the Client object"""
@@ -261,13 +278,15 @@ class TestBase(unittest.TestCase):
"""
self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
- def assertChannelException(self, expectedCode, message):
+ def assertChannelException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected channel_close method")
self.assertEqual("channel", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
def assertConnectionException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected connection_close method")
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
diff --git a/qpid/python/tests/message.py b/qpid/python/tests/message.py
index 916a9825bd..d5f5d4dbc2 100644
--- a/qpid/python/tests/message.py
+++ b/qpid/python/tests/message.py
@@ -171,8 +171,7 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok()
- msg2.ok()
+ msg1.ok(batchoffset=1)#One and Two
msg4.ok()
channel.message_recover(requeue=False)
@@ -216,9 +215,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok() #One
- msg2.ok() #Two
- msg4.ok() #Two
+ msg1.ok(batchoffset=1) #One and Two
+ msg4.ok() #Four
channel.message_cancel(destination="consumer_tag")
channel.message_consume(queue="test-requeue", destination="consumer_tag")
@@ -263,11 +261,9 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
#only 5 messages should have been delivered:
- msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
@@ -275,18 +271,13 @@ class MessageTests(TestBase):
#ack messages and check that the next set arrive ok:
#todo: once batching is implmented, send a single response for all messages
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
@@ -313,12 +304,9 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
#only 5 messages should have been delivered (i.e. 45 bytes worth):
- msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- print "Got Message %d" % i
- msgs.append(msg)
try:
extra = queue.get(timeout=1)
@@ -326,18 +314,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
@@ -383,14 +366,13 @@ class MessageTests(TestBase):
reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply.ok()
-
- #todo: when batching is available, test ack multiple
- #if(i == 13):
- # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True)
- #if(i in [15, 17, 19]):
- # channel.message_ack(delivery_tag=reply.delivery_tag)
+ msg = self.client.queue(tag).get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ if (i==13):
+ msg.ok(batchoffset=-2)#11, 12 & 13
+ if(i in [15, 17, 19]):
+ msg.ok()
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
@@ -479,8 +461,9 @@ class MessageTests(TestBase):
queue = other.queue("c1")
msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg, Reference))
- self.assertEquals(data, msg.get_complete())
+ self.assertTrue(isinstance(msg.body, ReferenceId))
+ self.assertTrue(msg.reference)
+ self.assertEquals(data, msg.reference.get_complete())
def test_reference_completion(self):
"""
@@ -511,12 +494,165 @@ class MessageTests(TestBase):
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+ def test_reference_multi_transfer(self):
+ """
+ Test that multiple transfer requests for the same reference are
+ correctly handled.
+ """
+ channel = self.channel
+ #declare and consume from two queues
+ channel.queue_declare(queue="q-one", exclusive=True)
+ channel.queue_declare(queue="q-two", exclusive=True)
+ channel.message_consume(queue="q-one", destination="q-one")
+ channel.message_consume(queue="q-two", destination="q-two")
+ queue1 = self.client.queue("q-one")
+ queue2 = self.client.queue("q-two")
+
+ #transfer a single ref to both queues (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="my data")
+ ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ #check that both queues have the message
+ self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+ self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+ self.assertEmpty(queue1)
+ self.assertEmpty(queue2)
+
+ #transfer a single ref to the same queue twice (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="second message")
+ ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ msg1 = queue1.get(timeout=1)
+ msg2 = queue1.get(timeout=1)
+ #order is undefined
+ if msg1.message_id == "abc":
+ self.assertEquals(msg2.message_id, "xyz")
+ else:
+ self.assertEquals(msg1.message_id, "xyz")
+ self.assertEquals(msg2.message_id, "abc")
+
+ #would be legal for the incoming messages to be transfered
+ #inline or by reference in any combination
+
+ if isinstance(msg1.body, ReferenceId):
+ self.assertEquals("second message", msg1.reference.get_complete())
+ if isinstance(msg2.body, ReferenceId):
+ if msg1.body != msg2.body:
+ self.assertEquals("second message", msg2.reference.get_complete())
+ #else ok, as same ref as msg1
+ else:
+ self.assertEquals("second message", msg1.body)
+ if isinstance(msg2.body, ReferenceId):
+ self.assertEquals("second message", msg2.reference.get_complete())
+ else:
+ self.assertEquals("second message", msg2.body)
+
+ self.assertEmpty(queue1)
+
+ def test_reference_unopened_on_append_error(self):
+ channel = self.channel
+ try:
+ channel.message_append(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_close_error(self):
+ channel = self.channel
+ try:
+ channel.message_close(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_transfer_error(self):
+ channel = self.channel
+ try:
+ channel.message_transfer(body=ReferenceId("unopened"))
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_already_opened_error(self):
+ channel = self.channel
+ channel.message_open(reference="a")
+ try:
+ channel.message_open(reference="a")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_empty_reference(self):
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+ channel.synchronous = True
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
+ self.assertEquals(msg.message_id, "empty-msg")
+ self.assertDataEquals(channel, msg, "")
+
+ def test_reject(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_consume(queue = "q", destination = "consumer")
+ channel.message_transfer(routing_key = "q", body="blah, blah")
+ msg = self.client.queue("consumer").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+ channel.message_cancel(destination = "consumer")
+ msg.reject()
+
+ channel.message_consume(queue = "q", destination = "checker")
+ msg = self.client.queue("checker").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+
+ def test_checkpoint(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_open(reference="my-ref")
+ channel.message_append(reference="my-ref", bytes="abcdefgh")
+ channel.message_append(reference="my-ref", bytes="ijklmnop")
+ channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
+ channel.channel_close()
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ channel.message_consume(queue = "q", destination = "consumer")
+ offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
+ self.assertEquals(offset, 16)
+ channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
+ channel.synchronous = False
+ channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
+ self.assertEmpty(self.client.queue("consumer"))
+
+
+ def assertDataEquals(self, channel, msg, expected):
+ if isinstance(msg.body, ReferenceId):
+ data = msg.reference.get_complete()
else:
data = msg.body
self.assertEquals("abcdefghijkl", data)
diff --git a/qpid/python/tests/tx.py b/qpid/python/tests/tx.py
index 55a5eaeade..0f6b4f5cd1 100644
--- a/qpid/python/tests/tx.py
+++ b/qpid/python/tests/tx.py
@@ -163,7 +163,8 @@ class TxTests(TestBase):
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+
+ msg.ok(batchoffset=-3)
channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
queue_b = self.client.queue("sub_b")