diff options
| author | Ben Buchwald <bbuchwald@boxfort.com> | 2020-02-26 10:14:17 -0500 |
|---|---|---|
| committer | Luke Bakken <lbakken@pivotal.io> | 2020-11-17 08:34:43 -0800 |
| commit | df2757473a1e0f5cf2e5f69a112174304e833a71 (patch) | |
| tree | 2bc9d17e73ea9128e9d715edfb07cd00abbc5fc2 | |
| parent | 778e8dad5ceb9fec974d7ceea04081cc2e35872b (diff) | |
| download | rabbitmq-server-git-rabbitmq-stomp-gh-144-monorepo.tar.gz | |
Allow STOMP to declare an exchange before publishing/subscribingrabbitmq-stomp-gh-144-monorepo
Currently publishing or subscribing to a /queue/ destination will declare the
queue, but publishing/subscribing to a /exchange/ destination will not declare
the exchange meaning all exchanges must already exist before being used by the
STOMP plugin. This change allows the STOMP plugin to optionally declare the
exchange before publishing/subscribing to it. The default is still the same.
If you want to declare the exchange, add a declare-exchange:true header to the
SEND or SUBSCRIBE frame. Additional headers can be used to specify properties
of the exchange (exchange-type, exchange-durable, and exchange-auto-delete).
These are prefixed with exchange- to distinguish them from the durable and
auto-delete properties of the receive queue created when subscribing. This
change requires a related changed to rabbitmq-erlang-client.
(cherry picked from commit a5b5ae2756a7efca7c8cfd8575395eb8fd01e629)
4 files changed, 87 insertions, 4 deletions
diff --git a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl index 974b5825c8..1dd39c05f3 100644 --- a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl +++ b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl @@ -14,10 +14,14 @@ -define(HEADER_CONTENT_LENGTH, "content-length"). -define(HEADER_CONTENT_TYPE, "content-type"). -define(HEADER_CORRELATION_ID, "correlation-id"). +-define(HEADER_DECLARE_EXCHANGE, "declare-exchange"). -define(HEADER_DESTINATION, "destination"). -define(HEADER_DURABLE, "durable"). --define(HEADER_EXPIRATION, "expiration"). +-define(HEADER_EXCHANGE_AUTO_DELETE, "exchange-auto-delete"). +-define(HEADER_EXCHANGE_DURABLE, "exchange-durable"). +-define(HEADER_EXCHANGE_TYPE, "exchange-type"). -define(HEADER_EXCLUSIVE, "exclusive"). +-define(HEADER_EXPIRATION, "expiration"). -define(HEADER_HEART_BEAT, "heart-beat"). -define(HEADER_HOST, "host"). -define(HEADER_ID, "id"). @@ -66,8 +70,14 @@ ]). -define(HEADER_PARAMS, [ + %% Queue Params ?HEADER_AUTO_DELETE, ?HEADER_DURABLE, ?HEADER_EXCLUSIVE, - ?HEADER_PERSISTENT + ?HEADER_PERSISTENT, + %% Exchange Params + ?HEADER_DECLARE_EXCHANGE, + ?HEADER_EXCHANGE_AUTO_DELETE, + ?HEADER_EXCHANGE_DURABLE, + ?HEADER_EXCHANGE_TYPE ]). diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index 6df1affbb7..25666dd224 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -321,7 +321,19 @@ build_param(?HEADER_AUTO_DELETE, Val) -> {auto_delete, string_to_boolean(Val)}; build_param(?HEADER_EXCLUSIVE, Val) -> - {exclusive, string_to_boolean(Val)}. + {exclusive, string_to_boolean(Val)}; + +build_param(?HEADER_DECLARE_EXCHANGE, Val) -> + {declare_exchange, string_to_boolean(Val)}; + +build_param(?HEADER_EXCHANGE_TYPE, Val) -> + {exchange_type, list_to_binary(string:strip(Val))}; + +build_param(?HEADER_EXCHANGE_DURABLE, Val) -> + {exchange_durable, string_to_boolean(Val)}; + +build_param(?HEADER_EXCHANGE_AUTO_DELETE, Val) -> + {exchange_auto_delete, string_to_boolean(Val)}. default_params({queue, _}) -> [{durable, true}]; diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/declare_exchange.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/declare_exchange.py new file mode 100644 index 0000000000..0d71fead34 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/declare_exchange.py @@ -0,0 +1,60 @@ +import unittest +import stomp +import pika +import base +import time +import os + +class TestDeclareExchange(base.BaseTest): + + def test_subscribe(self): + destination = "/exchange/declare-exchange-subscribe-test" + + # subscribe + self.subscribe_dest(self.conn, destination, None, + receipt='subscribed', + headers={ + 'declare-exchange': True, + }) + + self.assertListener("Couldn't declare exchange", numRcts=1) + + def test_send(self): + destination = "/exchange/declare-exchange-send-test" + + # send + self.conn.send(destination, "test1", + receipt='sent', + headers={ + 'declare-exchange': True, + }) + + self.assertListener("Couldn't declare exchange", numRcts=1) + + def test_properties(self): + destination = "/exchange/declare-exchange-properties-test" + + # subscribe + self.subscribe_dest(self.conn, destination, None, + receipt='subscribed', + headers={ + 'declare-exchange': True, + 'exchange-type': 'topic', + 'exchange-durable': False, + 'exchange-auto-delete': True, + }) + + self.assertListener("Couldn't declare exchange", numRcts=1) + + # now try to declare the queue using pika + # if the properties are the same we should + # not get any error + connection = pika.BlockingConnection(pika.ConnectionParameters( + host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) + channel = connection.channel() + channel.exchange_declare(exchange='declare-exchange-properties-test', + exchange_type='topic', + durable=False, + auto_delete=True) + + connection.close() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py index 01967465a2..64088815af 100755 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py @@ -16,6 +16,7 @@ if __name__ == '__main__': 'destinations', 'redelivered', 'topic_permissions', - 'x_queue_type_quorum' + 'x_queue_type_quorum', + 'declare_exchange', ] test_runner.run_unittests(modules) |
