summaryrefslogtreecommitdiff
path: root/python/tests/message.py
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-02-12 15:15:25 +0000
committerGordon Sim <gsim@apache.org>2007-02-12 15:15:25 +0000
commitcec1521f9e022bf695dbeddf8ea9a1769dca9f94 (patch)
tree7556aa256be882bf98a79afb6b39999a3a5040d9 /python/tests/message.py
parent84a9b8a8f8b68b73db988943441410a51f6bc860 (diff)
downloadqpid-python-cec1521f9e022bf695dbeddf8ea9a1769dca9f94.tar.gz
* qpid/client.py - altered handling of transfer for references,
add the transfer to queue as before and require client to look up the reference themselves. this lets application access the other transfer fields * tests/message.py - added some more tests for references git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@506477 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests/message.py')
-rw-r--r--python/tests/message.py141
1 files changed, 123 insertions, 18 deletions
diff --git a/python/tests/message.py b/python/tests/message.py
index df621ff295..c22a0396ef 100644
--- a/python/tests/message.py
+++ b/python/tests/message.py
@@ -445,15 +445,7 @@ class MessageTests(TestBase):
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
- msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
- else:
- data = msg.body
- self.assertEquals("abcdefghijkl", data)
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl")
def test_reference_large(self):
@@ -482,8 +474,8 @@ class MessageTests(TestBase):
queue = other.queue("c1")
msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg, Reference))
- self.assertEquals(data, msg.get_complete())
+ self.assertTrue(isinstance(msg.body, ReferenceId))
+ self.assertEquals(data, ch2.references.get(msg.body.id).get_complete())
def test_reference_completion(self):
"""
@@ -514,13 +506,126 @@ class MessageTests(TestBase):
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+ def test_reference_multi_transfer(self):
+ """
+ Test that multiple transfer requests for the same reference are
+ correctly handled.
+ """
+ channel = self.channel
+ #declare and consume from two queues
+ channel.queue_declare(queue="q-one", exclusive=True)
+ channel.queue_declare(queue="q-two", exclusive=True)
+ channel.message_consume(queue="q-one", destination="q-one")
+ channel.message_consume(queue="q-two", destination="q-two")
+ queue1 = self.client.queue("q-one")
+ queue2 = self.client.queue("q-two")
+
+ #transfer a single ref to both queues (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="my data")
+ ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ #check that both queues have the message
+ self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+ self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+ self.assertEmpty(queue1)
+ self.assertEmpty(queue2)
+
+ #transfer a single ref to the same queue twice (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="second message")
+ ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ msg1 = queue1.get(timeout=1)
+ msg2 = queue1.get(timeout=1)
+ #order is undefined
+ if msg1.message_id == "abc":
+ self.assertEquals(msg2.message_id, "xyz")
+ else:
+ self.assertEquals(msg1.message_id, "xyz")
+ self.assertEquals(msg2.message_id, "abc")
+
+ #would be legal for the incoming messages to be transfered
+ #inline or by reference in any combination
+
+ if isinstance(msg1.body, ReferenceId):
+ self.assertEquals("second message", channel.references.get(msg1.body.id).get_complete())
+ if isinstance(msg2.body, ReferenceId):
+ if msg1.body != msg2.body:
+ self.assertEquals("second message", channel.references.get(msg2.body.id).get_complete())
+ #else ok, as same ref as msg1
+ else:
+ self.assertEquals("second message", msg1.body)
+ if isinstance(msg2.body, ReferenceId):
+ self.assertEquals("second message", channel.references.get(msg2.body.id).get_complete())
+ else:
+ self.assertEquals("second message", msg2.body)
+
+ self.assertEmpty(queue1)
+
+ def test_reference_unopened_on_append_error(self):
+ channel = self.channel
+ try:
+ channel.message_append(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_close_error(self):
+ channel = self.channel
+ try:
+ channel.message_close(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_transfer_error(self):
+ channel = self.channel
+ try:
+ channel.message_transfer(body=ReferenceId("unopened"))
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_already_opened_error(self):
+ channel = self.channel
+ channel.message_open(reference="a")
+ try:
+ channel.message_open(reference="a")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_empty_reference(self):
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+ channel.synchronous = True
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
+ self.assertEquals(msg.message_id, "empty-msg")
+ self.assertDataEquals(channel, msg, "")
+
+
+ def assertDataEquals(self, channel, msg, expected):
+ if isinstance(msg.body, ReferenceId):
+ data = channel.references.get(msg.body.id).get_complete()
else:
data = msg.body
- self.assertEquals("abcdefghijkl", data)
-
+ self.assertEquals(expected, data)