summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcial Rosales <mrosales@pivotal.io>2022-11-17 13:01:07 +0100
committerMergify <37929162+mergify[bot]@users.noreply.github.com>2022-11-17 12:35:55 +0000
commit293dc1c50d3f8375ec668962843e75ac3a2ccedd (patch)
tree01afeab1f3788de68053c72a6940f44716b95c9c
parent934253a4c12d6c8d3689a755b48162aea8ce0b1f (diff)
downloadrabbitmq-server-git-293dc1c50d3f8375ec668962843e75ac3a2ccedd.tar.gz
Support x-max-age argument in stomp
for stream declarations (cherry picked from commit 5747461f0972d348ead3b137abb13f35a737a5be)
-rwxr-xr-xdeps/rabbitmq_stomp/examples/python/stream-receiver.py40
-rw-r--r--deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl2
-rw-r--r--deps/rabbitmq_stomp/src/rabbit_stomp_util.erl3
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py5
4 files changed, 48 insertions, 2 deletions
diff --git a/deps/rabbitmq_stomp/examples/python/stream-receiver.py b/deps/rabbitmq_stomp/examples/python/stream-receiver.py
new file mode 100755
index 0000000000..87856044e0
--- /dev/null
+++ b/deps/rabbitmq_stomp/examples/python/stream-receiver.py
@@ -0,0 +1,40 @@
+import time
+import sys
+
+import stomp
+import random
+import requests
+
+class MyListener(stomp.ConnectionListener):
+ def on_error(self, frame):
+ print('received an error "%s"' % frame.body)
+ def on_message(self, frame):
+ print('received a message "%s"' % frame.body)
+
+# Define a STOMP connection and port
+conn = stomp.Connection([("localhost", 61613)])
+conn.set_listener('', MyListener())
+conn.connect('guest', 'guest', wait=True) # define the username/password
+
+# Setup a subscription
+conn.subscribe(destination='/exchange/stomp1', id=1234, ack='client', headers={
+ 'x-queue-name': 'my-stomp-stream',
+ 'x-queue-type': 'stream',
+ 'x-max-age' : '10h',
+ 'durable': True,
+ 'auto-delete': False,
+ 'id': 1234,
+ 'prefetch-count': 10
+})
+
+response = requests.get("http://localhost:15672/api/queues/%2F/my-stomp-stream", auth=("guest", "guest"))
+stream = response.json()
+print("Stream arguments:")
+print(" x-queue-type:" + stream["arguments"]["x-queue-type"])
+print(" x-max-age:" + stream["arguments"]["x-max-age"])
+
+while True:
+ time.sleep(15) # send a random message every 15 seconds
+ conn.send( destination='/exchange/stomp1', body=str(random.randint(1,11)))
+
+conn.disconnect()
diff --git a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
index 4b41b71def..0528642525 100644
--- a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
+++ b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
@@ -43,6 +43,7 @@
-define(HEADER_X_DEAD_LETTER_ROUTING_KEY, "x-dead-letter-routing-key").
-define(HEADER_X_EXPIRES, "x-expires").
-define(HEADER_X_MAX_LENGTH, "x-max-length").
+-define(HEADER_X_MAX_AGE, "x-max-age").
-define(HEADER_X_MAX_LENGTH_BYTES, "x-max-length-bytes").
-define(HEADER_X_MAX_PRIORITY, "x-max-priority").
-define(HEADER_X_MESSAGE_TTL, "x-message-ttl").
@@ -60,6 +61,7 @@
?HEADER_X_DEAD_LETTER_ROUTING_KEY,
?HEADER_X_EXPIRES,
?HEADER_X_MAX_LENGTH,
+ ?HEADER_X_MAX_AGE,
?HEADER_X_MAX_LENGTH_BYTES,
?HEADER_X_MAX_PRIORITY,
?HEADER_X_MESSAGE_TTL,
diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
index 4f3b4f612c..5800b7d85a 100644
--- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
+++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
@@ -289,6 +289,9 @@ build_argument(?HEADER_X_MAX_PRIORITY, Val) ->
build_argument(?HEADER_X_MESSAGE_TTL, Val) ->
{list_to_binary(?HEADER_X_MESSAGE_TTL), long,
list_to_integer(string:strip(Val))};
+build_argument(?HEADER_X_MAX_AGE, Val) ->
+ {list_to_binary(?HEADER_X_MAX_AGE), longstr,
+ list_to_binary(string:strip(Val))};
build_argument(?HEADER_X_QUEUE_TYPE, Val) ->
{list_to_binary(?HEADER_X_QUEUE_TYPE), longstr,
list_to_binary(string:strip(Val))}.
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py
index d4a8a6291f..193aa3e89f 100644
--- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py
@@ -10,7 +10,7 @@ import base
import time
import os
import re
-
+import urllib.request, json
class TestUserGeneratedQueueName(base.BaseTest):
@@ -25,6 +25,7 @@ class TestUserGeneratedQueueName(base.BaseTest):
headers={
'x-queue-name': queueName,
'x-queue-type': 'stream',
+ 'x-max-age' : '10h',
'durable': True,
'auto-delete': False,
'id': 1234,
@@ -69,4 +70,4 @@ if __name__ == '__main__':
modules = [
__name__
]
- test_runner.run_unittests(modules) \ No newline at end of file
+ test_runner.run_unittests(modules)