From 83f3eb1a4a3a7375f9efdfe9588b98b31e19586b Mon Sep 17 00:00:00 2001 From: dcorbacho Date: Mon, 9 Nov 2020 11:58:07 +0000 Subject: Test stream queues with STOMP Stream queues require prefetch count and manual acknowledgment (cherry picked from commit 45d3c3fecbf9f9782851bf580620a50bb4692a45) --- .../test/python_SUITE_data/src/test.py | 3 +- .../python_SUITE_data/src/x_queue_type_stream.py | 64 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py index 01967465a2..a4e912272c 100755 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py @@ -16,6 +16,7 @@ if __name__ == '__main__': 'destinations', 'redelivered', 'topic_permissions', - 'x_queue_type_quorum' + 'x_queue_type_quorum', + 'x_queue_type_stream' ] test_runner.run_unittests(modules) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py new file mode 100644 index 0000000000..1848dfde91 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py @@ -0,0 +1,64 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import pika +import base +import time +import os +import re + + +class TestUserGeneratedQueueName(base.BaseTest): + + def test_stream_queue(self): + queueName = 'my-stream-queue' + + # subscribe + self.subscribe_dest( + self.conn, + '/topic/stream-queue-test', + None, + headers={ + 'x-queue-name': queueName, + 'x-queue-type': 'stream', + 'durable': True, + 'auto-delete': False, + 'id': 1234, + 'prefetch-count': 10 + }, + ack="client" + ) + + # let the stream queue some time to start + time.sleep(5) + + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) + channel = connection.channel() + + # publish a message to the named queue + channel.basic_publish( + exchange='', + routing_key=queueName, + body='Hello World!') + + # could we declare a stream queue? + stream_queue_supported = True + if len(self.listener.errors) > 0: + pattern = re.compile(r"feature flag is disabled", re.MULTILINE) + for error in self.listener.errors: + if pattern.search(error['message']) != None: + stream_queue_supported = False + break + + if stream_queue_supported: + # check if we receive the message from the STOMP subscription + self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertEquals(1, len(self.listener.messages)) + self.conn.disconnect() + + connection.close() -- cgit v1.2.1