diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-16 11:22:16 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-16 11:22:16 +0200 |
commit | b704e2f8effd34164061c712e7238b24b0e4bcec (patch) | |
tree | 3c1c1a0d4ecdef105ba6c3d42e0e608f9a2bdc69 | |
parent | 27d06e8021f8f7d4fbfba2f4f10173df995f3e4c (diff) | |
download | rabbitmq-server-git-b704e2f8effd34164061c712e7238b24b0e4bcec.tar.gz |
Check stream name before creation
Should not start with "amq.", be empty. Strips also newline
and carriage return characters.
-rw-r--r-- | deps/rabbitmq_stream/include/rabbit_stream.hrl | 1 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 36 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 37 |
3 files changed, 59 insertions, 15 deletions
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 684dd9a066..0593893d93 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -37,6 +37,7 @@ -define(RESPONSE_CODE_FRAME_TOO_LARGE, 13). -define(RESPONSE_CODE_INTERNAL_ERROR, 14). -define(RESPONSE_CODE_ACCESS_REFUSED, 15). +-define(RESPONSE_CODE_PRECONDITION_FAILED, 16). -define(OFFSET_TYPE_FIRST, 0). -define(OFFSET_TYPE_LAST, 1). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index b43c7c837a..6ffad09c5e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -799,23 +799,29 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, State, <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary, ArgumentsCount:32, ArgumentsBinary/binary>>, Rest) -> - {Arguments, _Rest} = parse_map(ArgumentsBinary, ArgumentsCount), - case check_configure_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of - ok -> - case rabbit_stream_manager:create(VirtualHost, Stream, Arguments, Username) of - {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} -> - rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]), - response_ok(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId), - {Connection, State, Rest}; - {error, reference_already_exists} -> - response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), - {Connection, State, Rest}; - {error, _} -> - response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR), + case rabbit_stream_utils:enforce_correct_stream_name(Stream) of + {ok, StreamName} -> + {Arguments, _Rest} = parse_map(ArgumentsBinary, ArgumentsCount), + case check_configure_permitted(#resource{name = StreamName, kind = queue, virtual_host = VirtualHost}, User, #{}) of + ok -> + case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username) of + {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} -> + rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]), + response_ok(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId), + {Connection, State, Rest}; + {error, reference_already_exists} -> + response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), + {Connection, State, Rest}; + {error, _} -> + response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR), + {Connection, State, Rest} + end; + error -> + response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), {Connection, State, Rest} end; - error -> - response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), + _ -> + response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), {Connection, State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl new file mode 100644 index 0000000000..7c3e829be3 --- /dev/null +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -0,0 +1,37 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_utils). + +%% API +-export([enforce_correct_stream_name/1]). + +enforce_correct_stream_name(Name) -> + % from rabbit_channel + StrippedName = binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]), + case check_name(StrippedName) of + ok -> + {ok, StrippedName}; + error -> + error + end. + +check_name(<<"amq.", _/binary>>) -> + error; +check_name(<<"">>) -> + error; +check_name(_Name) -> + ok. |