summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Buchwald <bbuchwald@boxfort.com>2020-02-26 10:14:17 -0500
committerLuke Bakken <lbakken@pivotal.io>2020-11-17 08:34:43 -0800
commitdf2757473a1e0f5cf2e5f69a112174304e833a71 (patch)
tree2bc9d17e73ea9128e9d715edfb07cd00abbc5fc2
parent778e8dad5ceb9fec974d7ceea04081cc2e35872b (diff)
downloadrabbitmq-server-git-rabbitmq-stomp-gh-144-monorepo.tar.gz
Allow STOMP to declare an exchange before publishing/subscribingrabbitmq-stomp-gh-144-monorepo
Currently publishing or subscribing to a /queue/ destination will declare the queue, but publishing/subscribing to a /exchange/ destination will not declare the exchange meaning all exchanges must already exist before being used by the STOMP plugin. This change allows the STOMP plugin to optionally declare the exchange before publishing/subscribing to it. The default is still the same. If you want to declare the exchange, add a declare-exchange:true header to the SEND or SUBSCRIBE frame. Additional headers can be used to specify properties of the exchange (exchange-type, exchange-durable, and exchange-auto-delete). These are prefixed with exchange- to distinguish them from the durable and auto-delete properties of the receive queue created when subscribing. This change requires a related changed to rabbitmq-erlang-client. (cherry picked from commit a5b5ae2756a7efca7c8cfd8575395eb8fd01e629)
-rw-r--r--deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl14
-rw-r--r--deps/rabbitmq_stomp/src/rabbit_stomp_util.erl14
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/declare_exchange.py60
-rwxr-xr-xdeps/rabbitmq_stomp/test/python_SUITE_data/src/test.py3
4 files changed, 87 insertions, 4 deletions
diff --git a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
index 974b5825c8..1dd39c05f3 100644
--- a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
+++ b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl
@@ -14,10 +14,14 @@
-define(HEADER_CONTENT_LENGTH, "content-length").
-define(HEADER_CONTENT_TYPE, "content-type").
-define(HEADER_CORRELATION_ID, "correlation-id").
+-define(HEADER_DECLARE_EXCHANGE, "declare-exchange").
-define(HEADER_DESTINATION, "destination").
-define(HEADER_DURABLE, "durable").
--define(HEADER_EXPIRATION, "expiration").
+-define(HEADER_EXCHANGE_AUTO_DELETE, "exchange-auto-delete").
+-define(HEADER_EXCHANGE_DURABLE, "exchange-durable").
+-define(HEADER_EXCHANGE_TYPE, "exchange-type").
-define(HEADER_EXCLUSIVE, "exclusive").
+-define(HEADER_EXPIRATION, "expiration").
-define(HEADER_HEART_BEAT, "heart-beat").
-define(HEADER_HOST, "host").
-define(HEADER_ID, "id").
@@ -66,8 +70,14 @@
]).
-define(HEADER_PARAMS, [
+ %% Queue Params
?HEADER_AUTO_DELETE,
?HEADER_DURABLE,
?HEADER_EXCLUSIVE,
- ?HEADER_PERSISTENT
+ ?HEADER_PERSISTENT,
+ %% Exchange Params
+ ?HEADER_DECLARE_EXCHANGE,
+ ?HEADER_EXCHANGE_AUTO_DELETE,
+ ?HEADER_EXCHANGE_DURABLE,
+ ?HEADER_EXCHANGE_TYPE
]).
diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
index 6df1affbb7..25666dd224 100644
--- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
+++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
@@ -321,7 +321,19 @@ build_param(?HEADER_AUTO_DELETE, Val) ->
{auto_delete, string_to_boolean(Val)};
build_param(?HEADER_EXCLUSIVE, Val) ->
- {exclusive, string_to_boolean(Val)}.
+ {exclusive, string_to_boolean(Val)};
+
+build_param(?HEADER_DECLARE_EXCHANGE, Val) ->
+ {declare_exchange, string_to_boolean(Val)};
+
+build_param(?HEADER_EXCHANGE_TYPE, Val) ->
+ {exchange_type, list_to_binary(string:strip(Val))};
+
+build_param(?HEADER_EXCHANGE_DURABLE, Val) ->
+ {exchange_durable, string_to_boolean(Val)};
+
+build_param(?HEADER_EXCHANGE_AUTO_DELETE, Val) ->
+ {exchange_auto_delete, string_to_boolean(Val)}.
default_params({queue, _}) ->
[{durable, true}];
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/declare_exchange.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/declare_exchange.py
new file mode 100644
index 0000000000..0d71fead34
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/declare_exchange.py
@@ -0,0 +1,60 @@
+import unittest
+import stomp
+import pika
+import base
+import time
+import os
+
+class TestDeclareExchange(base.BaseTest):
+
+ def test_subscribe(self):
+ destination = "/exchange/declare-exchange-subscribe-test"
+
+ # subscribe
+ self.subscribe_dest(self.conn, destination, None,
+ receipt='subscribed',
+ headers={
+ 'declare-exchange': True,
+ })
+
+ self.assertListener("Couldn't declare exchange", numRcts=1)
+
+ def test_send(self):
+ destination = "/exchange/declare-exchange-send-test"
+
+ # send
+ self.conn.send(destination, "test1",
+ receipt='sent',
+ headers={
+ 'declare-exchange': True,
+ })
+
+ self.assertListener("Couldn't declare exchange", numRcts=1)
+
+ def test_properties(self):
+ destination = "/exchange/declare-exchange-properties-test"
+
+ # subscribe
+ self.subscribe_dest(self.conn, destination, None,
+ receipt='subscribed',
+ headers={
+ 'declare-exchange': True,
+ 'exchange-type': 'topic',
+ 'exchange-durable': False,
+ 'exchange-auto-delete': True,
+ })
+
+ self.assertListener("Couldn't declare exchange", numRcts=1)
+
+ # now try to declare the queue using pika
+ # if the properties are the same we should
+ # not get any error
+ connection = pika.BlockingConnection(pika.ConnectionParameters(
+ host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
+ channel = connection.channel()
+ channel.exchange_declare(exchange='declare-exchange-properties-test',
+ exchange_type='topic',
+ durable=False,
+ auto_delete=True)
+
+ connection.close()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py
index 01967465a2..64088815af 100755
--- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py
@@ -16,6 +16,7 @@ if __name__ == '__main__':
'destinations',
'redelivered',
'topic_permissions',
- 'x_queue_type_quorum'
+ 'x_queue_type_quorum',
+ 'declare_exchange',
]
test_runner.run_unittests(modules)