summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-16 11:22:16 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-16 11:22:16 +0200
commitb704e2f8effd34164061c712e7238b24b0e4bcec (patch)
tree3c1c1a0d4ecdef105ba6c3d42e0e608f9a2bdc69
parent27d06e8021f8f7d4fbfba2f4f10173df995f3e4c (diff)
downloadrabbitmq-server-git-b704e2f8effd34164061c712e7238b24b0e4bcec.tar.gz
Check stream name before creation
Should not start with "amq.", be empty. Strips also newline and carriage return characters.
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl1
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl36
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl37
3 files changed, 59 insertions, 15 deletions
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 684dd9a066..0593893d93 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -37,6 +37,7 @@
-define(RESPONSE_CODE_FRAME_TOO_LARGE, 13).
-define(RESPONSE_CODE_INTERNAL_ERROR, 14).
-define(RESPONSE_CODE_ACCESS_REFUSED, 15).
+-define(RESPONSE_CODE_PRECONDITION_FAILED, 16).
-define(OFFSET_TYPE_FIRST, 0).
-define(OFFSET_TYPE_LAST, 1).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index b43c7c837a..6ffad09c5e 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -799,23 +799,29 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost,
State,
<<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary,
ArgumentsCount:32, ArgumentsBinary/binary>>, Rest) ->
- {Arguments, _Rest} = parse_map(ArgumentsBinary, ArgumentsCount),
- case check_configure_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of
- ok ->
- case rabbit_stream_manager:create(VirtualHost, Stream, Arguments, Username) of
- {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
- rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]),
- response_ok(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId),
- {Connection, State, Rest};
- {error, reference_already_exists} ->
- response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS),
- {Connection, State, Rest};
- {error, _} ->
- response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR),
+ case rabbit_stream_utils:enforce_correct_stream_name(Stream) of
+ {ok, StreamName} ->
+ {Arguments, _Rest} = parse_map(ArgumentsBinary, ArgumentsCount),
+ case check_configure_permitted(#resource{name = StreamName, kind = queue, virtual_host = VirtualHost}, User, #{}) of
+ ok ->
+ case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username) of
+ {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
+ rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]),
+ response_ok(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId),
+ {Connection, State, Rest};
+ {error, reference_already_exists} ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS),
+ {Connection, State, Rest};
+ {error, _} ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR),
+ {Connection, State, Rest}
+ end;
+ error ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
{Connection, State, Rest}
end;
- error ->
- response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
+ _ ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED),
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
new file mode 100644
index 0000000000..7c3e829be3
--- /dev/null
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -0,0 +1,37 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/en-US/MPL/2.0/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is Pivotal Software, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_utils).
+
+%% API
+-export([enforce_correct_stream_name/1]).
+
+enforce_correct_stream_name(Name) ->
+ % from rabbit_channel
+ StrippedName = binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]),
+ case check_name(StrippedName) of
+ ok ->
+ {ok, StrippedName};
+ error ->
+ error
+ end.
+
+check_name(<<"amq.", _/binary>>) ->
+ error;
+check_name(<<"">>) ->
+ error;
+check_name(_Name) ->
+ ok.