diff options
author | Marcial Rosales <mrosales@pivotal.io> | 2022-11-17 13:01:07 +0100 |
---|---|---|
committer | Mergify <37929162+mergify[bot]@users.noreply.github.com> | 2022-11-17 12:35:55 +0000 |
commit | 293dc1c50d3f8375ec668962843e75ac3a2ccedd (patch) | |
tree | 01afeab1f3788de68053c72a6940f44716b95c9c | |
parent | 934253a4c12d6c8d3689a755b48162aea8ce0b1f (diff) | |
download | rabbitmq-server-git-293dc1c50d3f8375ec668962843e75ac3a2ccedd.tar.gz |
Support x-max-age argument in stomp
for stream declarations
(cherry picked from commit 5747461f0972d348ead3b137abb13f35a737a5be)
4 files changed, 48 insertions, 2 deletions
diff --git a/deps/rabbitmq_stomp/examples/python/stream-receiver.py b/deps/rabbitmq_stomp/examples/python/stream-receiver.py new file mode 100755 index 0000000000..87856044e0 --- /dev/null +++ b/deps/rabbitmq_stomp/examples/python/stream-receiver.py @@ -0,0 +1,40 @@ +import time +import sys + +import stomp +import random +import requests + +class MyListener(stomp.ConnectionListener): + def on_error(self, frame): + print('received an error "%s"' % frame.body) + def on_message(self, frame): + print('received a message "%s"' % frame.body) + +# Define a STOMP connection and port +conn = stomp.Connection([("localhost", 61613)]) +conn.set_listener('', MyListener()) +conn.connect('guest', 'guest', wait=True) # define the username/password + +# Setup a subscription +conn.subscribe(destination='/exchange/stomp1', id=1234, ack='client', headers={ + 'x-queue-name': 'my-stomp-stream', + 'x-queue-type': 'stream', + 'x-max-age' : '10h', + 'durable': True, + 'auto-delete': False, + 'id': 1234, + 'prefetch-count': 10 +}) + +response = requests.get("http://localhost:15672/api/queues/%2F/my-stomp-stream", auth=("guest", "guest")) +stream = response.json() +print("Stream arguments:") +print(" x-queue-type:" + stream["arguments"]["x-queue-type"]) +print(" x-max-age:" + stream["arguments"]["x-max-age"]) + +while True: + time.sleep(15) # send a random message every 15 seconds + conn.send( destination='/exchange/stomp1', body=str(random.randint(1,11))) + +conn.disconnect() diff --git a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl index 4b41b71def..0528642525 100644 --- a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl +++ b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl @@ -43,6 +43,7 @@ -define(HEADER_X_DEAD_LETTER_ROUTING_KEY, "x-dead-letter-routing-key"). -define(HEADER_X_EXPIRES, "x-expires"). -define(HEADER_X_MAX_LENGTH, "x-max-length"). +-define(HEADER_X_MAX_AGE, "x-max-age"). -define(HEADER_X_MAX_LENGTH_BYTES, "x-max-length-bytes"). -define(HEADER_X_MAX_PRIORITY, "x-max-priority"). -define(HEADER_X_MESSAGE_TTL, "x-message-ttl"). @@ -60,6 +61,7 @@ ?HEADER_X_DEAD_LETTER_ROUTING_KEY, ?HEADER_X_EXPIRES, ?HEADER_X_MAX_LENGTH, + ?HEADER_X_MAX_AGE, ?HEADER_X_MAX_LENGTH_BYTES, ?HEADER_X_MAX_PRIORITY, ?HEADER_X_MESSAGE_TTL, diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index 4f3b4f612c..5800b7d85a 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -289,6 +289,9 @@ build_argument(?HEADER_X_MAX_PRIORITY, Val) -> build_argument(?HEADER_X_MESSAGE_TTL, Val) -> {list_to_binary(?HEADER_X_MESSAGE_TTL), long, list_to_integer(string:strip(Val))}; +build_argument(?HEADER_X_MAX_AGE, Val) -> + {list_to_binary(?HEADER_X_MAX_AGE), longstr, + list_to_binary(string:strip(Val))}; build_argument(?HEADER_X_QUEUE_TYPE, Val) -> {list_to_binary(?HEADER_X_QUEUE_TYPE), longstr, list_to_binary(string:strip(Val))}. diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py index d4a8a6291f..193aa3e89f 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py @@ -10,7 +10,7 @@ import base import time import os import re - +import urllib.request, json class TestUserGeneratedQueueName(base.BaseTest): @@ -25,6 +25,7 @@ class TestUserGeneratedQueueName(base.BaseTest): headers={ 'x-queue-name': queueName, 'x-queue-type': 'stream', + 'x-max-age' : '10h', 'durable': True, 'auto-delete': False, 'id': 1234, @@ -69,4 +70,4 @@ if __name__ == '__main__': modules = [ __name__ ] - test_runner.run_unittests(modules)
\ No newline at end of file + test_runner.run_unittests(modules) |