diff options
Diffstat (limited to 'deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py')
-rw-r--r-- | deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py new file mode 100644 index 0000000000..9103bc76ea --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py @@ -0,0 +1,252 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import time +import os + +class TestAck(base.BaseTest): + + def test_ack_client(self): + destination = "/queue/ack-test" + + # subscribe and send message + self.listener.reset(2) ## expecting 2 messages + self.subscribe_dest(self.conn, destination, None, + ack='client', + headers={'prefetch-count': '10'}) + self.conn.send(destination, "test1") + self.conn.send(destination, "test2") + self.assertTrue(self.listener.wait(4), "initial message not received") + self.assertEquals(2, len(self.listener.messages)) + + # disconnect with no ack + self.conn.disconnect() + + # now reconnect + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + listener2.reset(2) + conn2.set_listener('', listener2) + self.subscribe_dest(conn2, destination, None, + ack='client', + headers={'prefetch-count': '10'}) + self.assertTrue(listener2.wait(), "message not received again") + self.assertEquals(2, len(listener2.messages)) + + # now ack only the last message - expecting cumulative behaviour + mid = listener2.messages[1]['headers'][self.ack_id_source_header] + self.ack_message(conn2, mid, None) + finally: + conn2.disconnect() + + # now reconnect again, shouldn't see the message + conn3 = self.create_connection() + try: + listener3 = base.WaitableListener() + conn3.set_listener('', listener3) + self.subscribe_dest(conn3, destination, None) + self.assertFalse(listener3.wait(3), + "unexpected message. ACK not working?") + finally: + conn3.disconnect() + + def test_ack_client_individual(self): + destination = "/queue/ack-test-individual" + + # subscribe and send message + self.listener.reset(2) ## expecting 2 messages + self.subscribe_dest(self.conn, destination, None, + ack='client-individual', + headers={'prefetch-count': '10'}) + self.conn.send(destination, "test1") + self.conn.send(destination, "test2") + self.assertTrue(self.listener.wait(4), "Both initial messages not received") + self.assertEquals(2, len(self.listener.messages)) + + # disconnect without acks + self.conn.disconnect() + + # now reconnect + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + listener2.reset(2) ## expect 2 messages + conn2.set_listener('', listener2) + self.subscribe_dest(conn2, destination, None, + ack='client-individual', + headers={'prefetch-count': '10'}) + self.assertTrue(listener2.wait(2.5), "Did not receive 2 messages") + self.assertEquals(2, len(listener2.messages), "Not exactly 2 messages received") + + # now ack only the 'test2' message - expecting individual behaviour + nummsgs = len(listener2.messages) + mid = None + for ind in range(nummsgs): + if listener2.messages[ind]['message']=="test2": + mid = listener2.messages[ind]['headers'][self.ack_id_source_header] + self.assertEquals(1, ind, 'Expecting test2 to be second message') + break + self.assertTrue(mid, "Did not find test2 message id.") + self.ack_message(conn2, mid, None) + finally: + conn2.disconnect() + + # now reconnect again, shouldn't see the message + conn3 = self.create_connection() + try: + listener3 = base.WaitableListener() + listener3.reset(2) ## expecting a single message, but wait for two + conn3.set_listener('', listener3) + self.subscribe_dest(conn3, destination, None) + self.assertFalse(listener3.wait(2.5), + "Expected to see only one message. ACK not working?") + self.assertEquals(1, len(listener3.messages), "Expecting exactly one message") + self.assertEquals("test1", listener3.messages[0]['message'], "Unexpected message remains") + finally: + conn3.disconnect() + + def test_ack_client_tx(self): + destination = "/queue/ack-test-tx" + + # subscribe and send message + self.listener.reset() + self.subscribe_dest(self.conn, destination, None, ack='client') + self.conn.send(destination, "test") + self.assertTrue(self.listener.wait(3), "initial message not received") + self.assertEquals(1, len(self.listener.messages)) + + # disconnect with no ack + self.conn.disconnect() + + # now reconnect + conn2 = self.create_connection() + try: + tx = "abc" + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + conn2.begin(transaction=tx) + self.subscribe_dest(conn2, destination, None, ack='client') + self.assertTrue(listener2.wait(), "message not received again") + self.assertEquals(1, len(listener2.messages)) + + # now ack + mid = listener2.messages[0]['headers'][self.ack_id_source_header] + self.ack_message(conn2, mid, None, transaction=tx) + + #now commit + conn2.commit(transaction=tx) + finally: + conn2.disconnect() + + # now reconnect again, shouldn't see the message + conn3 = self.create_connection() + try: + listener3 = base.WaitableListener() + conn3.set_listener('', listener3) + self.subscribe_dest(conn3, destination, None) + self.assertFalse(listener3.wait(3), + "unexpected message. TX ACK not working?") + finally: + conn3.disconnect() + + def test_topic_prefetch(self): + destination = "/topic/prefetch-test" + + # subscribe and send message + self.listener.reset(6) ## expect 6 messages + self.subscribe_dest(self.conn, destination, None, + ack='client', + headers={'prefetch-count': '5'}) + + for x in range(10): + self.conn.send(destination, "test" + str(x)) + + self.assertFalse(self.listener.wait(3), + "Should not have been able to see 6 messages") + self.assertEquals(5, len(self.listener.messages)) + + def test_nack(self): + destination = "/queue/nack-test" + + #subscribe and send + self.subscribe_dest(self.conn, destination, None, + ack='client-individual') + self.conn.send(destination, "nack-test") + + self.assertTrue(self.listener.wait(), "Not received message") + message_id = self.listener.messages[0]['headers'][self.ack_id_source_header] + self.listener.reset() + + self.nack_message(self.conn, message_id, None) + self.assertTrue(self.listener.wait(), "Not received message after NACK") + message_id = self.listener.messages[0]['headers'][self.ack_id_source_header] + self.ack_message(self.conn, message_id, None) + + def test_nack_multi(self): + destination = "/queue/nack-multi" + + self.listener.reset(2) + + #subscribe and send + self.subscribe_dest(self.conn, destination, None, + ack='client', + headers = {'prefetch-count' : '10'}) + self.conn.send(destination, "nack-test1") + self.conn.send(destination, "nack-test2") + + self.assertTrue(self.listener.wait(), "Not received messages") + message_id = self.listener.messages[1]['headers'][self.ack_id_source_header] + self.listener.reset(2) + + self.nack_message(self.conn, message_id, None) + self.assertTrue(self.listener.wait(), "Not received message again") + message_id = self.listener.messages[1]['headers'][self.ack_id_source_header] + self.ack_message(self.conn, message_id, None) + + def test_nack_without_requeueing(self): + destination = "/queue/nack-test-no-requeue" + + self.subscribe_dest(self.conn, destination, None, + ack='client-individual') + self.conn.send(destination, "nack-test") + + self.assertTrue(self.listener.wait(), "Not received message") + message_id = self.listener.messages[0]['headers'][self.ack_id_source_header] + self.listener.reset() + + self.conn.send_frame("NACK", {self.ack_id_header: message_id, "requeue": False}) + self.assertFalse(self.listener.wait(4), "Received message after NACK with requeue = False") + +class TestAck11(TestAck): + + def create_connection_obj(self, version='1.1', vhost='/', heartbeats=(0, 0)): + conn = stomp.StompConnection11(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))], + vhost=vhost, + heartbeats=heartbeats) + self.ack_id_source_header = 'message-id' + self.ack_id_header = 'message-id' + return conn + + def test_version(self): + self.assertEquals('1.1', self.conn.version) + +class TestAck12(TestAck): + + def create_connection_obj(self, version='1.2', vhost='/', heartbeats=(0, 0)): + conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))], + vhost=vhost, + heartbeats=heartbeats) + self.ack_id_source_header = 'ack' + self.ack_id_header = 'id' + return conn + + def test_version(self): + self.assertEquals('1.2', self.conn.version) |