summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-11-09 11:58:07 +0000
committerLuke Bakken <lbakken@pivotal.io>2020-11-17 08:28:30 -0800
commit83f3eb1a4a3a7375f9efdfe9588b98b31e19586b (patch)
tree63fca694f2ba5fecd5d7a1e52cce09e8a1cbb08f
parent778e8dad5ceb9fec974d7ceea04081cc2e35872b (diff)
downloadrabbitmq-server-git-rabbitmq-stomp-gh-153-monorepo.tar.gz
Test stream queues with STOMPrabbitmq-stomp-gh-153-monorepo
Stream queues require prefetch count and manual acknowledgment (cherry picked from commit 45d3c3fecbf9f9782851bf580620a50bb4692a45)
-rwxr-xr-xdeps/rabbitmq_stomp/test/python_SUITE_data/src/test.py3
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py64
2 files changed, 66 insertions, 1 deletions
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py
index 01967465a2..a4e912272c 100755
--- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py
@@ -16,6 +16,7 @@ if __name__ == '__main__':
'destinations',
'redelivered',
'topic_permissions',
- 'x_queue_type_quorum'
+ 'x_queue_type_quorum',
+ 'x_queue_type_stream'
]
test_runner.run_unittests(modules)
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py
new file mode 100644
index 0000000000..1848dfde91
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py
@@ -0,0 +1,64 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import pika
+import base
+import time
+import os
+import re
+
+
+class TestUserGeneratedQueueName(base.BaseTest):
+
+ def test_stream_queue(self):
+ queueName = 'my-stream-queue'
+
+ # subscribe
+ self.subscribe_dest(
+ self.conn,
+ '/topic/stream-queue-test',
+ None,
+ headers={
+ 'x-queue-name': queueName,
+ 'x-queue-type': 'stream',
+ 'durable': True,
+ 'auto-delete': False,
+ 'id': 1234,
+ 'prefetch-count': 10
+ },
+ ack="client"
+ )
+
+ # let the stream queue some time to start
+ time.sleep(5)
+
+ connection = pika.BlockingConnection(
+ pika.ConnectionParameters(host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
+ channel = connection.channel()
+
+ # publish a message to the named queue
+ channel.basic_publish(
+ exchange='',
+ routing_key=queueName,
+ body='Hello World!')
+
+ # could we declare a stream queue?
+ stream_queue_supported = True
+ if len(self.listener.errors) > 0:
+ pattern = re.compile(r"feature flag is disabled", re.MULTILINE)
+ for error in self.listener.errors:
+ if pattern.search(error['message']) != None:
+ stream_queue_supported = False
+ break
+
+ if stream_queue_supported:
+ # check if we receive the message from the STOMP subscription
+ self.assertTrue(self.listener.wait(5), "initial message not received")
+ self.assertEquals(1, len(self.listener.messages))
+ self.conn.disconnect()
+
+ connection.close()