diff options
Diffstat (limited to 'deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py')
-rw-r--r-- | deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py | 536 |
1 files changed, 536 insertions, 0 deletions
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py new file mode 100644 index 0000000000..76e5402686 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py @@ -0,0 +1,536 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import time + +class TestExchange(base.BaseTest): + + + def test_amq_direct(self): + ''' Test basic send/receive for /exchange/amq.direct ''' + self.__test_exchange_send_rec("amq.direct", "route") + + def test_amq_topic(self): + ''' Test basic send/receive for /exchange/amq.topic ''' + self.__test_exchange_send_rec("amq.topic", "route") + + def test_amq_fanout(self): + ''' Test basic send/receive for /exchange/amq.fanout ''' + self.__test_exchange_send_rec("amq.fanout", "route") + + def test_amq_fanout_no_route(self): + ''' Test basic send/receive, /exchange/amq.direct, no routing key''' + self.__test_exchange_send_rec("amq.fanout") + + def test_invalid_exchange(self): + ''' Test invalid exchange error ''' + self.listener.reset(1) + self.subscribe_dest(self.conn, "/exchange/does.not.exist", None, + ack="auto") + self.assertListener("Expecting an error", numErrs=1) + err = self.listener.errors[0] + self.assertEquals("not_found", err['headers']['message']) + self.assertEquals( + "NOT_FOUND - no exchange 'does.not.exist' in vhost '/'\n", + err['message']) + time.sleep(1) + self.assertFalse(self.conn.is_connected()) + + def __test_exchange_send_rec(self, exchange, route = None): + if exchange != "amq.topic": + dest = "/exchange/" + exchange + else: + dest = "/topic" + if route != None: + dest += "/" + route + + self.simple_test_send_rec(dest) + +class TestQueue(base.BaseTest): + + def test_send_receive(self): + ''' Test basic send/receive for /queue ''' + destination = '/queue/test' + self.simple_test_send_rec(destination) + + def test_send_receive_in_other_conn(self): + ''' Test send in one connection, receive in another ''' + destination = '/queue/test2' + + # send + self.conn.send(destination, "hello") + + # now receive + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + + self.subscribe_dest(conn2, destination, None, ack="auto") + self.assertTrue(listener2.wait(10), "no receive") + finally: + conn2.disconnect() + + def test_send_receive_in_other_conn_with_disconnect(self): + ''' Test send, disconnect, receive ''' + destination = '/queue/test3' + + # send + self.conn.send(destination, "hello thar", receipt="foo") + self.listener.wait(3) + self.conn.disconnect() + + # now receive + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + + self.subscribe_dest(conn2, destination, None, ack="auto") + self.assertTrue(listener2.wait(10), "no receive") + finally: + conn2.disconnect() + + + def test_multi_subscribers(self): + ''' Test multiple subscribers against a single /queue destination ''' + destination = '/queue/test-multi' + + ## set up two subscribers + conn1, listener1 = self.create_subscriber_connection(destination) + conn2, listener2 = self.create_subscriber_connection(destination) + + try: + ## now send + self.conn.send(destination, "test1") + self.conn.send(destination, "test2") + + ## expect both consumers to get a message? + self.assertTrue(listener1.wait(2)) + self.assertEquals(1, len(listener1.messages), + "unexpected message count") + self.assertTrue(listener2.wait(2)) + self.assertEquals(1, len(listener2.messages), + "unexpected message count") + finally: + conn1.disconnect() + conn2.disconnect() + + def test_send_with_receipt(self): + destination = '/queue/test-receipt' + def noop(): pass + self.__test_send_receipt(destination, noop, noop) + + def test_send_with_receipt_tx(self): + destination = '/queue/test-receipt-tx' + tx = 'receipt.tx' + + def before(): + self.conn.begin(transaction=tx) + + def after(): + self.assertFalse(self.listener.wait(1)) + self.conn.commit(transaction=tx) + + self.__test_send_receipt(destination, before, after, {'transaction': tx}) + + def test_interleaved_receipt_no_receipt(self): + ''' Test i-leaved receipt/no receipt, no-r bracketed by rs ''' + + destination = '/queue/ir' + + self.listener.reset(5) + + self.subscribe_dest(self.conn, destination, None, ack="auto") + self.conn.send(destination, 'first', receipt='a') + self.conn.send(destination, 'second') + self.conn.send(destination, 'third', receipt='b') + + self.assertListener("Missing messages/receipts", numMsgs=3, numRcts=2, timeout=3) + + self.assertEquals(set(['a','b']), self.__gather_receipts()) + + def test_interleaved_receipt_no_receipt_tx(self): + ''' Test i-leaved receipt/no receipt, no-r bracketed by r+xactions ''' + + destination = '/queue/ir' + tx = 'tx.ir' + + # three messages and two receipts + self.listener.reset(5) + + self.subscribe_dest(self.conn, destination, None, ack="auto") + self.conn.begin(transaction=tx) + + self.conn.send(destination, 'first', receipt='a', transaction=tx) + self.conn.send(destination, 'second', transaction=tx) + self.conn.send(destination, 'third', receipt='b', transaction=tx) + self.conn.commit(transaction=tx) + + self.assertListener("Missing messages/receipts", numMsgs=3, numRcts=2, timeout=40) + + expected = set(['a', 'b']) + missing = expected.difference(self.__gather_receipts()) + + self.assertEquals(set(), missing, "Missing receipts: " + str(missing)) + + def test_interleaved_receipt_no_receipt_inverse(self): + ''' Test i-leaved receipt/no receipt, r bracketed by no-rs ''' + + destination = '/queue/ir' + + self.listener.reset(4) + + self.subscribe_dest(self.conn, destination, None, ack="auto") + self.conn.send(destination, 'first') + self.conn.send(destination, 'second', receipt='a') + self.conn.send(destination, 'third') + + self.assertListener("Missing messages/receipt", numMsgs=3, numRcts=1, timeout=3) + + self.assertEquals(set(['a']), self.__gather_receipts()) + + def __test_send_receipt(self, destination, before, after, headers = {}): + count = 50 + self.listener.reset(count) + + before() + expected_receipts = set() + + for x in range(0, count): + receipt = "test" + str(x) + expected_receipts.add(receipt) + self.conn.send(destination, "test receipt", + receipt=receipt, headers=headers) + after() + + self.assertTrue(self.listener.wait(5)) + + missing_receipts = expected_receipts.difference( + self.__gather_receipts()) + + self.assertEquals(set(), missing_receipts, + "missing receipts: " + str(missing_receipts)) + + def __gather_receipts(self): + result = set() + for r in self.listener.receipts: + result.add(r['headers']['receipt-id']) + return result + +class TestTopic(base.BaseTest): + + def test_send_receive(self): + ''' Test basic send/receive for /topic ''' + destination = '/topic/test' + self.simple_test_send_rec(destination) + + def test_send_multiple(self): + ''' Test /topic with multiple consumers ''' + destination = '/topic/multiple' + + ## set up two subscribers + conn1, listener1 = self.create_subscriber_connection(destination) + conn2, listener2 = self.create_subscriber_connection(destination) + + try: + ## listeners are expecting 2 messages + listener1.reset(2) + listener2.reset(2) + + ## now send + self.conn.send(destination, "test1") + self.conn.send(destination, "test2") + + ## expect both consumers to get both messages + self.assertTrue(listener1.wait(5)) + self.assertEquals(2, len(listener1.messages), + "unexpected message count") + self.assertTrue(listener2.wait(5)) + self.assertEquals(2, len(listener2.messages), + "unexpected message count") + finally: + conn1.disconnect() + conn2.disconnect() + + def test_send_multiple_with_a_large_message(self): + ''' Test /topic with multiple consumers ''' + destination = '/topic/16mb' + # payload size + s = 1024 * 1024 * 16 + message = 'x' * s + + conn1, listener1 = self.create_subscriber_connection(destination) + conn2, listener2 = self.create_subscriber_connection(destination) + + try: + listener1.reset(2) + listener2.reset(2) + + self.conn.send(destination, message) + self.conn.send(destination, message) + + self.assertTrue(listener1.wait(10)) + self.assertEquals(2, len(listener1.messages), + "unexpected message count") + self.assertTrue(len(listener2.messages[0]['message']) == s, + "unexpected message size") + + self.assertTrue(listener2.wait(10)) + self.assertEquals(2, len(listener2.messages), + "unexpected message count") + finally: + conn1.disconnect() + conn2.disconnect() + +class TestReplyQueue(base.BaseTest): + + def test_reply_queue(self): + ''' Test with two separate clients. Client 1 sends + message to a known destination with a defined reply + queue. Client 2 receives on known destination and replies + on the reply destination. Client 1 gets the reply message''' + + known = '/queue/known' + reply = '/temp-queue/0' + + ## Client 1 uses pre-supplied connection and listener + ## Set up client 2 + conn2, listener2 = self.create_subscriber_connection(known) + + try: + self.conn.send(known, "test", + headers = {"reply-to": reply}) + + self.assertTrue(listener2.wait(5)) + self.assertEquals(1, len(listener2.messages)) + + reply_to = listener2.messages[0]['headers']['reply-to'] + self.assertTrue(reply_to.startswith('/reply-queue/')) + + conn2.send(reply_to, "reply") + self.assertTrue(self.listener.wait(5)) + self.assertEquals("reply", self.listener.messages[0]['message']) + finally: + conn2.disconnect() + + def test_reuse_reply_queue(self): + ''' Test re-use of reply-to queue ''' + + known2 = '/queue/known2' + known3 = '/queue/known3' + reply = '/temp-queue/foo' + + def respond(cntn, listna): + self.assertTrue(listna.wait(5)) + self.assertEquals(1, len(listna.messages)) + reply_to = listna.messages[0]['headers']['reply-to'] + self.assertTrue(reply_to.startswith('/reply-queue/')) + cntn.send(reply_to, "reply") + + ## Client 1 uses pre-supplied connection and listener + ## Set up clients 2 and 3 + conn2, listener2 = self.create_subscriber_connection(known2) + conn3, listener3 = self.create_subscriber_connection(known3) + try: + self.listener.reset(2) + self.conn.send(known2, "test2", + headers = {"reply-to": reply}) + self.conn.send(known3, "test3", + headers = {"reply-to": reply}) + respond(conn2, listener2) + respond(conn3, listener3) + + self.assertTrue(self.listener.wait(5)) + self.assertEquals(2, len(self.listener.messages)) + self.assertEquals("reply", self.listener.messages[0]['message']) + self.assertEquals("reply", self.listener.messages[1]['message']) + finally: + conn2.disconnect() + conn3.disconnect() + + def test_perm_reply_queue(self): + '''As test_reply_queue, but with a non-temp reply queue''' + + known = '/queue/known' + reply = '/queue/reply' + + ## Client 1 uses pre-supplied connection and listener + ## Set up client 2 + conn1, listener1 = self.create_subscriber_connection(reply) + conn2, listener2 = self.create_subscriber_connection(known) + + try: + conn1.send(known, "test", + headers = {"reply-to": reply}) + + self.assertTrue(listener2.wait(5)) + self.assertEquals(1, len(listener2.messages)) + + reply_to = listener2.messages[0]['headers']['reply-to'] + self.assertTrue(reply_to == reply) + + conn2.send(reply_to, "reply") + self.assertTrue(listener1.wait(5)) + self.assertEquals("reply", listener1.messages[0]['message']) + finally: + conn1.disconnect() + conn2.disconnect() + +class TestDurableSubscription(base.BaseTest): + + ID = 'test.subscription' + + def __subscribe(self, dest, conn=None, id=None): + if not conn: + conn = self.conn + if not id: + id = TestDurableSubscription.ID + + self.subscribe_dest(conn, dest, id, ack="auto", + headers = {'durable': 'true', + 'receipt': 1, + 'auto-delete': False}) + + def __assert_receipt(self, listener=None, pos=None): + if not listener: + listener = self.listener + + self.assertTrue(listener.wait(5)) + self.assertEquals(1, len(self.listener.receipts)) + if pos is not None: + self.assertEquals(pos, self.listener.receipts[0]['msg_no']) + + def __assert_message(self, msg, listener=None, pos=None): + if not listener: + listener = self.listener + + self.assertTrue(listener.wait(5)) + self.assertEquals(1, len(listener.messages)) + self.assertEquals(msg, listener.messages[0]['message']) + if pos is not None: + self.assertEquals(pos, self.listener.messages[0]['msg_no']) + + def do_test_durable_subscription(self, durability_header): + destination = '/topic/durable' + + self.__subscribe(destination) + self.__assert_receipt() + + # send first message without unsubscribing + self.listener.reset(1) + self.conn.send(destination, "first") + self.__assert_message("first") + + # now unsubscribe (disconnect only) + self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID) + + # send again + self.listener.reset(2) + self.conn.send(destination, "second") + + # resubscribe and expect receipt + self.__subscribe(destination) + self.__assert_receipt(pos=1) + # and message + self.__assert_message("second", pos=2) + + # now unsubscribe (cancel) + self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID, + headers={durability_header: 'true'}) + + # send again + self.listener.reset(1) + self.conn.send(destination, "third") + + # resubscribe and expect no message + self.__subscribe(destination) + self.assertTrue(self.listener.wait(3)) + self.assertEquals(0, len(self.listener.messages)) + self.assertEquals(1, len(self.listener.receipts)) + + def test_durable_subscription(self): + self.do_test_durable_subscription('durable') + + def test_durable_subscription_and_legacy_header(self): + self.do_test_durable_subscription('persistent') + + def test_share_subscription(self): + destination = '/topic/durable-shared' + + conn2 = self.create_connection() + conn2.set_listener('', self.listener) + + try: + self.__subscribe(destination) + self.__assert_receipt() + self.listener.reset(1) + self.__subscribe(destination, conn2) + self.__assert_receipt() + + self.listener.reset(100) + + # send 100 messages + for x in range(0, 100): + self.conn.send(destination, "msg" + str(x)) + + self.assertTrue(self.listener.wait(5)) + self.assertEquals(100, len(self.listener.messages)) + finally: + conn2.disconnect() + + def test_separate_ids(self): + destination = '/topic/durable-separate' + + conn2 = self.create_connection() + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + + try: + # ensure durable subscription exists for each ID + self.__subscribe(destination) + self.__assert_receipt() + self.__subscribe(destination, conn2, "other.id") + self.__assert_receipt(listener2) + self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID) + self.unsubscribe_dest(conn2, destination, "other.id") + + self.listener.reset(101) + listener2.reset(101) ## 100 messages and 1 receipt + + # send 100 messages + for x in range(0, 100): + self.conn.send(destination, "msg" + str(x)) + + self.__subscribe(destination) + self.__subscribe(destination, conn2, "other.id") + + for l in [self.listener, listener2]: + self.assertTrue(l.wait(20)) + self.assertTrue(len(l.messages) >= 90) + self.assertTrue(len(l.messages) <= 100) + + finally: + conn2.disconnect() + + def do_test_durable_subscribe_no_id_and_header(self, header): + destination = '/topic/durable-invalid' + + self.conn.send_frame('SUBSCRIBE', + {'destination': destination, 'ack': 'auto', header: 'true'}) + self.listener.wait(3) + self.assertEquals(1, len(self.listener.errors)) + self.assertEquals("Missing Header", self.listener.errors[0]['headers']['message']) + + def test_durable_subscribe_no_id(self): + self.do_test_durable_subscribe_no_id_and_header('durable') + + def test_durable_subscribe_no_id_and_legacy_header(self): + self.do_test_durable_subscribe_no_id_and_header('persistent') |