diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-11-09 11:58:07 +0000 |
---|---|---|
committer | Luke Bakken <lbakken@pivotal.io> | 2020-11-17 08:28:30 -0800 |
commit | 83f3eb1a4a3a7375f9efdfe9588b98b31e19586b (patch) | |
tree | 63fca694f2ba5fecd5d7a1e52cce09e8a1cbb08f | |
parent | 778e8dad5ceb9fec974d7ceea04081cc2e35872b (diff) | |
download | rabbitmq-server-git-rabbitmq-stomp-gh-153-monorepo.tar.gz |
Test stream queues with STOMPrabbitmq-stomp-gh-153-monorepo
Stream queues require prefetch count and manual acknowledgment
(cherry picked from commit 45d3c3fecbf9f9782851bf580620a50bb4692a45)
-rwxr-xr-x | deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py | 3 | ||||
-rw-r--r-- | deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py | 64 |
2 files changed, 66 insertions, 1 deletions
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py index 01967465a2..a4e912272c 100755 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py @@ -16,6 +16,7 @@ if __name__ == '__main__': 'destinations', 'redelivered', 'topic_permissions', - 'x_queue_type_quorum' + 'x_queue_type_quorum', + 'x_queue_type_stream' ] test_runner.run_unittests(modules) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py new file mode 100644 index 0000000000..1848dfde91 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py @@ -0,0 +1,64 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import pika +import base +import time +import os +import re + + +class TestUserGeneratedQueueName(base.BaseTest): + + def test_stream_queue(self): + queueName = 'my-stream-queue' + + # subscribe + self.subscribe_dest( + self.conn, + '/topic/stream-queue-test', + None, + headers={ + 'x-queue-name': queueName, + 'x-queue-type': 'stream', + 'durable': True, + 'auto-delete': False, + 'id': 1234, + 'prefetch-count': 10 + }, + ack="client" + ) + + # let the stream queue some time to start + time.sleep(5) + + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) + channel = connection.channel() + + # publish a message to the named queue + channel.basic_publish( + exchange='', + routing_key=queueName, + body='Hello World!') + + # could we declare a stream queue? + stream_queue_supported = True + if len(self.listener.errors) > 0: + pattern = re.compile(r"feature flag is disabled", re.MULTILINE) + for error in self.listener.errors: + if pattern.search(error['message']) != None: + stream_queue_supported = False + break + + if stream_queue_supported: + # check if we receive the message from the STOMP subscription + self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertEquals(1, len(self.listener.messages)) + self.conn.disconnect() + + connection.close() |