diff options
-rw-r--r-- | python/qpid/client.py | 5 | ||||
-rw-r--r-- | python/tests/message.py | 141 |
2 files changed, 124 insertions, 22 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py index ed7dbca5d2..e14166c885 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -111,10 +111,7 @@ class ClientDelegate(Delegate): self.client.started.set() def message_transfer(self, ch, msg): - if isinstance(msg.body, ReferenceId): - self.client.queue(msg.destination).put(ch.references.get(msg.body.id)) - else: - self.client.queue(msg.destination).put(msg) + self.client.queue(msg.destination).put(msg) def message_open(self, ch, msg): ch.references.open(msg.reference) diff --git a/python/tests/message.py b/python/tests/message.py index df621ff295..c22a0396ef 100644 --- a/python/tests/message.py +++ b/python/tests/message.py @@ -445,15 +445,7 @@ class MessageTests(TestBase): #first, wait for the ok for the transfer ack.get_response(timeout=1) - msg = queue.get(timeout=1) - if isinstance(msg, Reference): - #should we force broker to deliver as reference by frame - #size limit? or test that separately? for compliance, - #allowing either seems best for now... - data = msg.get_complete() - else: - data = msg.body - self.assertEquals("abcdefghijkl", data) + self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl") def test_reference_large(self): @@ -482,8 +474,8 @@ class MessageTests(TestBase): queue = other.queue("c1") msg = queue.get(timeout=1) - self.assertTrue(isinstance(msg, Reference)) - self.assertEquals(data, msg.get_complete()) + self.assertTrue(isinstance(msg.body, ReferenceId)) + self.assertEquals(data, ch2.references.get(msg.body.id).get_complete()) def test_reference_completion(self): """ @@ -514,13 +506,126 @@ class MessageTests(TestBase): #first, wait for the ok for the transfer ack.get_response(timeout=1) + self.assertDataEquals(channel, queue.get(timeout=1), "abcd") + + def test_reference_multi_transfer(self): + """ + Test that multiple transfer requests for the same reference are + correctly handled. + """ + channel = self.channel + #declare and consume from two queues + channel.queue_declare(queue="q-one", exclusive=True) + channel.queue_declare(queue="q-two", exclusive=True) + channel.message_consume(queue="q-one", destination="q-one") + channel.message_consume(queue="q-two", destination="q-two") + queue1 = self.client.queue("q-one") + queue2 = self.client.queue("q-two") + + #transfer a single ref to both queues (in separate commands) + channel.message_open(reference="my-ref") + channel.synchronous = False + ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) + channel.message_append(reference="my-ref", bytes="my data") + ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + #check that both queues have the message + self.assertDataEquals(channel, queue1.get(timeout=1), "my data") + self.assertDataEquals(channel, queue2.get(timeout=1), "my data") + self.assertEmpty(queue1) + self.assertEmpty(queue2) + + #transfer a single ref to the same queue twice (in separate commands) + channel.message_open(reference="my-ref") + channel.synchronous = False + ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) + channel.message_append(reference="my-ref", bytes="second message") + ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + msg1 = queue1.get(timeout=1) + msg2 = queue1.get(timeout=1) + #order is undefined + if msg1.message_id == "abc": + self.assertEquals(msg2.message_id, "xyz") + else: + self.assertEquals(msg1.message_id, "xyz") + self.assertEquals(msg2.message_id, "abc") + + #would be legal for the incoming messages to be transfered + #inline or by reference in any combination + + if isinstance(msg1.body, ReferenceId): + self.assertEquals("second message", channel.references.get(msg1.body.id).get_complete()) + if isinstance(msg2.body, ReferenceId): + if msg1.body != msg2.body: + self.assertEquals("second message", channel.references.get(msg2.body.id).get_complete()) + #else ok, as same ref as msg1 + else: + self.assertEquals("second message", msg1.body) + if isinstance(msg2.body, ReferenceId): + self.assertEquals("second message", channel.references.get(msg2.body.id).get_complete()) + else: + self.assertEquals("second message", msg2.body) + + self.assertEmpty(queue1) + + def test_reference_unopened_on_append_error(self): + channel = self.channel + try: + channel.message_append(reference="unopened") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_unopened_on_close_error(self): + channel = self.channel + try: + channel.message_close(reference="unopened") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_unopened_on_transfer_error(self): + channel = self.channel + try: + channel.message_transfer(body=ReferenceId("unopened")) + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_already_opened_error(self): + channel = self.channel + channel.message_open(reference="a") + try: + channel.message_open(reference="a") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_empty_reference(self): + channel = self.channel + channel.queue_declare(queue="ref_queue", exclusive=True) + channel.message_consume(queue="ref_queue", destination="c1") + queue = self.client.queue("c1") + + refId = "myref" + channel.message_open(reference=refId) + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) + channel.synchronous = True + channel.message_close(reference=refId) + + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + msg = queue.get(timeout=1) - if isinstance(msg, Reference): - #should we force broker to deliver as reference by frame - #size limit? or test that separately? for compliance, - #allowing either seems best for now... - data = msg.get_complete() + self.assertEquals(msg.message_id, "empty-msg") + self.assertDataEquals(channel, msg, "") + + + def assertDataEquals(self, channel, msg, expected): + if isinstance(msg.body, ReferenceId): + data = channel.references.get(msg.body.id).get_complete() else: data = msg.body - self.assertEquals("abcdefghijkl", data) - + self.assertEquals(expected, data) |