diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/cpp_failing_0-10.txt | 13 | ||||
-rw-r--r-- | python/qpid/codec.py | 25 | ||||
-rw-r--r-- | python/qpid/message.py | 8 | ||||
-rw-r--r-- | python/qpid/peer.py | 82 | ||||
-rw-r--r-- | python/qpid/spec.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/broker.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/dtx.py | 26 | ||||
-rw-r--r-- | python/tests_0-10/example.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/message.py | 24 | ||||
-rw-r--r-- | python/tests_0-10/tx.py | 56 |
10 files changed, 178 insertions, 62 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 798d1769eb..e68f942d67 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -1,4 +1,15 @@ -tests_0-10.message.MessageTests.test_checkpoint tests_0-10.message.MessageTests.test_reject tests_0-10.basic.BasicTests.test_get +tests_0-10.message.MessageTests.test_get +tests_0-10.message.MessageTests.test_checkpoint +tests_0-10.message.MessageTests.test_empty_reference +tests_0-10.message.MessageTests.test_reference_already_opened_error +tests_0-10.message.MessageTests.test_reference_completion +tests_0-10.message.MessageTests.test_reference_large +tests_0-10.message.MessageTests.test_reference_multi_transfer +tests_0-10.message.MessageTests.test_reference_simple +tests_0-10.message.MessageTests.test_reference_unopened_on_append_error +tests_0-10.message.MessageTests.test_reference_unopened_on_close_error +tests_0-10.message.MessageTests.test_reference_unopened_on_transfer_error + diff --git a/python/qpid/codec.py b/python/qpid/codec.py index a5228e8003..a0d9696c8b 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -329,12 +329,6 @@ class Codec: return ReferenceId(self.decode_longstr()) # new domains for 0-10: - - def encode_uuid(self, s): - self.encode_longstr(s) - - def decode_uuid(self): - return self.decode_longstr() def encode_rfc1982_long(self, s): self.encode_long(s) @@ -342,10 +336,21 @@ class Codec: def decode_rfc1982_long(self): return self.decode_long() - #Not done yet def encode_rfc1982_long_set(self, s): - self.encode_short(0) + self.encode_short(len(s)) + for i in s: + self.encode_long(i) def decode_rfc1982_long_set(self): - self.decode_short() - return 0; + count = self.decode_short() + set = [] + for i in range(0, count): + set.append(self.decode_long()) + return set; + + #not correct for 0-10 yet + def encode_uuid(self, s): + self.encode_longstr(s) + + def decode_uuid(self): + return self.decode_longstr() diff --git a/python/qpid/message.py b/python/qpid/message.py index f80293180e..970ab9d974 100644 --- a/python/qpid/message.py +++ b/python/qpid/message.py @@ -26,7 +26,10 @@ class Message: self.frame = frame self.method = frame.method_type self.content = content - + if self.method.klass.name != "execution": + self.command_id = self.channel.incoming_completion.sequence.next() + #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name + def __len__(self): return len(self.frame.args) @@ -66,3 +69,6 @@ class Message: def __repr__(self): return Message.REPR % (self.method, self.frame.args, self.content) + + def complete(self, cumulative=True): + self.channel.incoming_completion.complete(mark=self.command_id, cumulative=cumulative) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 3927f20667..bedc96895b 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -30,6 +30,7 @@ from message import Message from queue import Queue, Closed as QueueClosed from content import Content from cStringIO import StringIO +from time import time class Sequence: @@ -186,11 +187,11 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) - self.completion = ExecutionCompletion() + self.completion = OutgoingCompletion() + self.incoming_completion = IncomingCompletion(self) # Use reliable framing if version == 0-9. - # (also for 0-10 while transitioning...) - self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10)) + self.reliable = (spec.major == 0 and spec.minor == 9) self.use_execution_layer = (spec.major == 0 and spec.minor == 10) self.synchronous = True @@ -202,6 +203,7 @@ class Channel: self.incoming.close() self.responses.close() self.completion.close() + self.incoming_completion.reset() def write(self, frame, content = None): if self.closed: @@ -252,6 +254,9 @@ class Channel: self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): + if type.klass.name == "channel" and (type.name == "close" or type.name == "open"): + self.completion.reset() + self.incoming_completion.reset() self.completion.next_command(type) content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) @@ -306,6 +311,13 @@ class Channel: return Message(self, resp, content) else: raise ValueError(resp) + elif self.synchronous and not frame.method.response \ + and self.use_execution_layer and frame.method.klass.name != "execution": + self.execution_flush() + self.completion.wait() + if self.closed: + raise Closed(self.reason) + except QueueClosed, e: if self.closed: raise Closed(self.reason) @@ -349,21 +361,32 @@ class Future: def is_complete(self): return self.completed.isSet() -class ExecutionCompletion: +class OutgoingCompletion: + """ + Manages completion of outgoing commands i.e. command sent by this peer + """ + def __init__(self): self.condition = threading.Condition() - self.sequence = Sequence(1) - self.command_id = 0 - self.mark = 0 + + self.sequence = Sequence(1) #issues ids for outgoing commands + self.command_id = 0 #last issued id + self.mark = 0 #commands up to this mark are known to be complete + self.closed = False def next_command(self, method): #the following test is a hack until the track/sub-channel is available if method.klass.name != "execution": self.command_id = self.sequence.next() + def reset(self): + self.sequence = Sequence(1) #reset counter + def close(self): + self.reset() self.condition.acquire() try: + self.closed = True self.condition.notifyAll() finally: self.condition.release() @@ -378,11 +401,50 @@ class ExecutionCompletion: def wait(self, point_of_interest=-1, timeout=None): if point_of_interest == -1: point_of_interest = self.command_id + start_time = time() + remaining = timeout self.condition.acquire() try: - if point_of_interest > self.mark: - self.condition.wait(timeout) + while not self.closed and point_of_interest > self.mark: + #print "waiting for ", point_of_interest, " mark is currently at ", self.mark + self.condition.wait(remaining) + if timeout: + if start_time + timeout > time(): break + else: remaining = timeout - (time() - start_time) finally: self.condition.release() - #todo: retry until timed out or closed return point_of_interest <= self.mark + +class IncomingCompletion: + """ + Manages completion of incoming commands i.e. command received by this peer + """ + + def __init__(self, channel): + self.sequence = Sequence(1) #issues ids for incoming commands + self.mark = 0 #id of last command of whose completion notification was sent to the other peer + self.channel = channel + + def next_id(self, method): + #the following test is a hack until the track/sub-channel is available + if method.klass.name != "execution": + return self.sequence.next() + else: + return 0 + + def reset(self): + self.sequence = Sequence(1) #reset counter + + def complete(self, mark, cumulative=True): + if cumulative: + if mark > self.mark: + self.mark = mark + self.channel.execution_complete(cumulative_execution_mark=self.mark) + else: + #TODO: record and manage the ranges properly + range = [mark, mark] + self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range) + + + + diff --git a/python/qpid/spec.py b/python/qpid/spec.py index c537401164..09e7dc9d0b 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -240,7 +240,7 @@ class Method(Metadata): "content": None, "uuid": "", "rfc1982_long": 0, - "rfc1982_long_set": 0 + "rfc1982_long_set": [] } def define_method(self, name): diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 684b36597e..6bc2f7ceb8 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -48,7 +48,7 @@ class BrokerTests(TestBase): body = "test ack" ch.message_transfer(routing_key = "otherqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) - msg.ok() + msg.complete() self.assert_(msg.body == body) def test_simple_delivery_immediate(self): diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index c0d1bd2b74..2835d703ae 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -40,6 +40,11 @@ class DtxTests(TestBase): XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 XA_OK = 8 + tx_counter = 0 + + def reset_channel(self): + self.channel.channel_close() + self.channel.channel_open() def test_simple_commit(self): """ @@ -56,6 +61,9 @@ class DtxTests(TestBase): #commit self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status) + #should close and reopen channel to ensure no unacked messages are held + self.reset_channel() + #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") @@ -79,6 +87,8 @@ class DtxTests(TestBase): #commit self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status) + self.reset_channel() + #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") @@ -100,6 +110,8 @@ class DtxTests(TestBase): #rollback self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.reset_channel() + #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") @@ -123,6 +135,8 @@ class DtxTests(TestBase): #rollback self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.reset_channel() + #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") @@ -191,6 +205,8 @@ class DtxTests(TestBase): channel = self.channel #do some transactional work & complete the transaction self.test_simple_commit() + # channel has been reset, so reselect for use with dtx + channel.dtx_demarcation_select() #start association for the same xid as the previously completed txn tx = self.xid("my-xid") @@ -355,7 +371,7 @@ class DtxTests(TestBase): self.assertEqual("two", msg.message_id) channel.message_cancel(destination="results") #ack the message then close the channel - msg.ok() + msg.complete() channel.channel_close() channel = self.channel @@ -446,7 +462,7 @@ class DtxTests(TestBase): channel2.dtx_demarcation_select() channel2.dtx_demarcation_start(xid=tx) channel2.message_get(queue="dummy", destination="dummy") - self.client.queue("dummy").get(timeout=1).ok() + self.client.queue("dummy").get(timeout=1).complete() channel2.message_transfer(routing_key="dummy", body="whatever") channel2.channel_close() @@ -548,7 +564,9 @@ class DtxTests(TestBase): channel.dtx_coordination_rollback(xid=x) self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) - def xid(self, txid, branchqual = ''): + def xid(self, txid): + DtxTests.tx_counter += 1 + branchqual = "v%s" % DtxTests.tx_counter return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual def txswap(self, tx, id): @@ -573,7 +591,7 @@ class DtxTests(TestBase): #consume from src: channel.message_get(destination="temp-swap", queue=src) msg = self.client.queue("temp-swap").get(timeout=1) - msg.ok(); + msg.complete(); #re-publish to dest channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py index 7ab4cc7d0a..dc71b0590b 100644 --- a/python/tests_0-10/example.py +++ b/python/tests_0-10/example.py @@ -90,5 +90,5 @@ class ExampleTest (TestBase): self.assertEqual(body, msg.body) # Now acknowledge the message. - msg.ok() + msg.complete() diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index b25016e680..74e2b6416f 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -171,8 +171,8 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok(batchoffset=1)#One and Two - msg4.ok() + msg2.complete(cumulative=True)#One and Two + msg4.complete(cumulative=False) channel.message_recover(requeue=False) @@ -215,8 +215,8 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok(batchoffset=1) #One and Two - msg4.ok() #Four + msg2.complete(cumulative=True) #One and Two + msg4.complete(cumulative=False) #Four channel.message_cancel(destination="consumer_tag") @@ -276,14 +276,13 @@ class MessageTests(TestBase): except Empty: None #ack messages and check that the next set arrive ok: - #todo: once batching is implmented, send a single response for all messages - msg.ok(batchoffset=-4)#1-5 + msg.complete() for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-4)#6-10 + msg.complete() try: extra = queue.get(timeout=1) @@ -320,13 +319,13 @@ class MessageTests(TestBase): except Empty: None #ack messages and check that the next set arrive ok: - msg.ok(batchoffset=-4)#1-5 + msg.complete() for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-4)#6-10 + msg.complete() try: extra = queue.get(timeout=1) @@ -376,9 +375,9 @@ class MessageTests(TestBase): self.assertEqual("Message %d" % i, msg.body) if (i==13): - msg.ok(batchoffset=-2)#11, 12 & 13 + msg.complete()#11, 12 & 13 if(i in [15, 17, 19]): - msg.ok() + msg.complete(cumulative=False) reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") @@ -395,8 +394,7 @@ class MessageTests(TestBase): self.assertEqual(reply.method.name, "ok") msg = self.client.queue(tag).get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() - #channel.message_ack(delivery_tag=reply.delivery_tag) + msg.complete() reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index 0f6b4f5cd1..b499c2d1f9 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -30,23 +30,39 @@ class TxTests(TestBase): """ Test that commited publishes are delivered and commited acks are not re-delivered """ + channel2 = self.client.channel(2) + channel2.channel_open() + self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel2.tx_commit() + channel2.channel_close() + + #use a different channel with new subscriptions to ensure + #there is no redelivery of acked messages: channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel.tx_commit() + channel.tx_select() + + channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False) + queue_a = self.client.queue("qa") + + channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False) + queue_b = self.client.queue("qb") + + channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False) + queue_c = self.client.queue("qc") #check results for i in range(1, 5): msg = queue_c.get(timeout=1) self.assertEqual("TxMessage %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("TxMessage 6", msg.body) - msg.ok() + msg.complete() msg = queue_a.get(timeout=1) self.assertEqual("TxMessage 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -76,15 +92,15 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -114,15 +130,15 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -150,10 +166,10 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.message_transfer(routing_key=name_a, body="Message %d" % i) + channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i) - channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") - channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") + channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6") + channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7") channel.tx_select() @@ -164,25 +180,25 @@ class TxTests(TestBase): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-3) + msg.complete() channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() #publish messages for i in range(1, 5): - channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) + channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i) - channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") - channel.message_transfer(routing_key=name_a, body="TxMessage 7") + channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6") + channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7") return queue_a, queue_b, queue_c |