summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-16 14:07:19 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-16 14:07:19 +0200
commitcc030ac195c00dc7f34616f0d2303deee6864371 (patch)
treeea2cf9d907542b0c48811a27da44080345d32ce7
parentb704e2f8effd34164061c712e7238b24b0e4bcec (diff)
downloadrabbitmq-server-git-cc030ac195c00dc7f34616f0d2303deee6864371.tar.gz
Support initial-cluster-size argument on creation
See rabbitmq/rabbitmq-server#2467
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl57
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl3
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java65
3 files changed, 106 insertions, 19 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 3b8c766552..9fa4e3521d 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -71,30 +71,49 @@ stream_queue_arguments(ArgumentsAcc, #{<<"max-segment-size">> := Value} = Argume
[{<<"x-max-segment-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
maps:remove(<<"max-segment-size">>, Arguments)
);
+stream_queue_arguments(ArgumentsAcc, #{<<"initial-cluster-size">> := Value} = Arguments) ->
+ stream_queue_arguments(
+ [{<<"x-initial-cluster-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
+ maps:remove(<<"initial-cluster-size">>, Arguments)
+ );
stream_queue_arguments(ArgumentsAcc, _Arguments) ->
ArgumentsAcc.
+validate_stream_queue_arguments([]) ->
+ ok;
+validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _]) when ClusterSize =< 0 ->
+ error;
+validate_stream_queue_arguments([_ | T]) ->
+ validate_stream_queue_arguments(T).
+
+
handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Reference},
- Q0 = amqqueue:new(
- Name,
- none, true, false, none, stream_queue_arguments(Arguments),
- VirtualHost, #{user => Username}, rabbit_stream_queue
- ),
- try
- case rabbit_stream_queue:declare(Q0, node()) of
- {new, Q} ->
- {reply, {ok, amqqueue:get_type_state(Q)}, State};
- {existing, _} ->
- {reply, {error, reference_already_exists}, State};
- {error, Err} ->
- rabbit_log:warn("Error while creating ~p stream, ~p~n", [Reference, Err]),
- {reply, {error, internal_error}, State}
- end
- catch
- exit:Error ->
- rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
- {reply, {error, internal_error}, State}
+ StreamQueueArguments = stream_queue_arguments(Arguments),
+ case validate_stream_queue_arguments(StreamQueueArguments) of
+ ok ->
+ Q0 = amqqueue:new(
+ Name,
+ none, true, false, none, StreamQueueArguments,
+ VirtualHost, #{user => Username}, rabbit_stream_queue
+ ),
+ try
+ case rabbit_stream_queue:declare(Q0, node()) of
+ {new, Q} ->
+ {reply, {ok, amqqueue:get_type_state(Q)}, State};
+ {existing, _} ->
+ {reply, {error, reference_already_exists}, State};
+ {error, Err} ->
+ rabbit_log:warn("Error while creating ~p stream, ~p~n", [Reference, Err]),
+ {reply, {error, internal_error}, State}
+ end
+ catch
+ exit:Error ->
+ rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
+ {reply, {error, internal_error}, State}
+ end;
+ error ->
+ {reply, {error, validation_failed}, State}
end;
handle_call({delete, VirtualHost, Reference, Username}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Reference},
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 6ffad09c5e..5dc5d5fa38 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -809,6 +809,9 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost,
rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]),
response_ok(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId),
{Connection, State, Rest};
+ {error, validation_failed} ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED),
+ {Connection, State, Rest};
{error, reference_already_exists} ->
response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS),
{Connection, State, Rest};
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java
new file mode 100644
index 0000000000..993c19b852
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java
@@ -0,0 +1,65 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.rabbitmq.stream.impl.Client;
+import com.rabbitmq.stream.impl.Client.Response;
+import com.rabbitmq.stream.impl.Client.StreamMetadata;
+import java.util.Collections;
+import java.util.UUID;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+public class ClusterSizeTest {
+
+ TestUtils.ClientFactory cf;
+
+ @ParameterizedTest
+ @ValueSource(strings = {"-1", "0"})
+ void clusterSizeZeroShouldReturnError(String clusterSize) {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ String s = UUID.randomUUID().toString();
+ Response response =
+ client.create(s, Collections.singletonMap("initial-cluster-size", clusterSize));
+ assertThat(response.isOk()).isFalse();
+ assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"1,1", "2,2", "3,3", "5,3"})
+ void clusterSizeShouldReflectOnMetadata(String requestedClusterSize, int expectedClusterSize) {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ String s = UUID.randomUUID().toString();
+ try {
+ Response response =
+ client.create(s, Collections.singletonMap("initial-cluster-size", requestedClusterSize));
+ assertThat(response.isOk()).isTrue();
+ StreamMetadata metadata = client.metadata(s).get(s);
+ assertThat(metadata).isNotNull();
+ assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
+ int actualClusterSize = metadata.getLeader() == null ? 0 : 1 + metadata.getReplicas().size();
+ assertThat(actualClusterSize).isEqualTo(expectedClusterSize);
+ } finally {
+ client.delete(s);
+ }
+ }
+}