summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2017-05-22 11:24:17 +0100
committerkjnilsson <knilsson@pivotal.io>2017-05-22 11:24:17 +0100
commitbbce6610137befeb573bb9588086e55fc4432d07 (patch)
treeec8b183fb2c102749aab02a7ad5b9fb676994aa9
parentd1eb01a68d5844139194227b7c82b403c48856f1 (diff)
downloadrabbitmq-server-git-bbce6610137befeb573bb9588086e55fc4432d07.tar.gz
Refactor configuration validation
To get better separation between validation and parsing and also move some of the protocol specific parsing into the relevant shovel implementation. [#138600475]
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl61
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_shovel_config.erl245
-rw-r--r--deps/rabbitmq_shovel/test/configuration_SUITE.erl20
-rw-r--r--deps/rabbitmq_shovel/test/dynamic_SUITE.erl2
-rw-r--r--deps/rabbitmq_shovel/test/parameters_SUITE.erl6
5 files changed, 145 insertions, 189 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
index 2ab6e197c9..4b3fb9eeec 100644
--- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
@@ -41,13 +41,16 @@
-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).
parse(_Name, {source, Source}) ->
+ Prefetch = parse_parameter(prefetch_count, fun parse_non_negative_integer/1,
+ proplists:get_value(prefetch_count, Source, ?DEFAULT_PREFETCH)),
+ Queue = parse_parameter(queue, fun parse_binary/1,
+ proplists:get_value(queue, Source)),
#{module => ?MODULE,
uris => proplists:get_value(uris, Source),
- resource_decl => decl_fun(sources, Source),
- queue => proplists:get_value(queue, Source),
+ resource_decl => decl_fun(Source),
+ queue => Queue,
delete_after => proplists:get_value(delete_after, Source, never),
- prefetch_count => proplists:get_value(prefetch_count, Source,
- ?DEFAULT_PREFETCH)};
+ prefetch_count => Prefetch};
parse(Name, {destination, Dest}) ->
PubProp = proplists:get_value(publish_properties, Dest, []),
PropsFun = try_make_parse_publish(publish_properties, PubProp),
@@ -59,7 +62,7 @@ parse(Name, {destination, Dest}) ->
PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1),
#{module => ?MODULE,
uris => proplists:get_value(uris, Dest),
- resource_decl => decl_fun(destinations, Dest),
+ resource_decl => decl_fun(Dest),
props_fun => PropsFun2,
fields_fun => PubFieldsFun,
add_forward_headers => AFH,
@@ -326,11 +329,7 @@ remaining(_Ch, #{source := #{delete_after := Count}}) ->
%%% PARSING
try_make_parse_publish(Key, Fields) ->
- try make_parse_publish(Key, Fields)
- catch
- throw:{error, Reason} ->
- fail({invalid_parameter_value, Key, Reason})
- end.
+ make_parse_publish(Key, Fields).
make_parse_publish(publish_fields, Fields) ->
make_publish_fun(Fields, record_info(fields, 'basic.publish'));
@@ -348,10 +347,12 @@ make_publish_fun(Fields, ValidFields) when is_list(Fields) ->
end, Publish, FieldIndices)
end;
Unexpected ->
- fail({unexpected_fields, Unexpected, ValidFields})
+ fail({invalid_parameter_value, publish_properties,
+ {unexpected_fields, Unexpected, ValidFields}})
end;
make_publish_fun(Fields, _) ->
- fail({require_list, Fields}).
+ fail({invalid_parameter_value, publish_properties,
+ {require_list, Fields}}).
make_field_indices(Valid, Fields) ->
make_field_indices(Fields, field_map(Valid, 2), []).
@@ -414,15 +415,29 @@ parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
parse_declaration({[Method | Rest], Acc}) ->
parse_declaration({[{Method, []} | Rest], Acc}).
-decl_fun(Key, Endpoint) ->
- try Decl = parse_declaration(
- {proplists:get_value(declarations, Endpoint, []), []}),
- fun (_Conn, Ch) ->
- [begin
- amqp_channel:call(Ch, M)
- end || M <- lists:reverse(Decl)]
- end
- catch throw:{error, Reason} ->
- fail({invalid_parameter_value, Key, Reason})
- end.
+decl_fun(Endpoint) ->
+ Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []),
+ []}),
+ fun (_Conn, Ch) ->
+ [begin
+ amqp_channel:call(Ch, M)
+ end || M <- lists:reverse(Decl)]
+ end.
+
+parse_parameter(Param, Fun, Value) ->
+ try
+ Fun(Value)
+ catch
+ _:{error, Err} ->
+ fail({invalid_parameter_value, Param, Err})
+ end.
+
+parse_non_negative_integer(N) when is_integer(N) andalso N >= 0 ->
+ N;
+parse_non_negative_integer(N) ->
+ fail({require_non_negative_integer, N}).
+parse_binary(Binary) when is_binary(Binary) ->
+ Binary;
+parse_binary(NotABinary) ->
+ fail({require_binary, NotABinary}).
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
index 121dc87aa3..8dbeea6f57 100644
--- a/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
@@ -31,29 +31,46 @@ resolve_module(amqp10) -> rabbit_amqp10_shovel.
is_legacy(Config) ->
not proplists:is_defined(source, Config).
+get_brokers(Props) ->
+ case proplists:get_value(brokers, Props) of
+ undefined ->
+ [get_value(broker, Props)];
+ Brokers ->
+ Brokers
+ end.
+
convert_from_legacy(Config) ->
- S = proplists:get_value(sources, Config),
- SUris = proplists:get_value(brokers, S, [proplists:get_value(broker, S)]),
- D = proplists:get_value(destinations, Config),
- DUris = proplists:get_value(brokers, D, [proplists:get_value(broker, D)]),
- Q = proplists:get_value(queue, Config),
+ S = get_value(sources, Config),
+ validate(S),
+ SUris = get_brokers(S),
+ validate_uris(brokers, SUris),
+ D = get_value(destinations, Config),
+ validate(D),
+ DUris = get_brokers(D),
+ validate_uris(brokers, DUris),
+ Q = get_value(queue, Config),
DA = proplists:get_value(delete_after, Config, never),
Pref = proplists:get_value(prefetch_count, Config, ?DEFAULT_PREFETCH),
RD = proplists:get_value(reconnect_delay, Config, ?DEFAULT_RECONNECT_DELAY),
AckMode = proplists:get_value(ack_mode, Config, ?DEFAULT_ACK_MODE),
+ validate_ack_mode(AckMode),
PubFields = proplists:get_value(publish_fields, Config, []),
PubProps = proplists:get_value(publish_properties, Config, []),
AFH = proplists:get_value(add_forward_headers, Config, false),
ATH = proplists:get_value(add_timestamp_header, Config, false),
+ SourceDecls = proplists:get_value(declarations, S, []),
+ validate_list(SourceDecls),
+ DestDecls = proplists:get_value(declarations, D, []),
+ validate_list(DestDecls),
[{source, [{protocol, amqp091},
{uris, SUris},
- {declarations, proplists:get_value(declarations, S, [])},
+ {declarations, SourceDecls},
{queue, Q},
{delete_after, DA},
{prefetch_count, Pref}]},
{destination, [{protocol, amqp091},
{uris, DUris},
- {declarations, proplists:get_value(declarations, D, [])},
+ {declarations, DestDecls},
{publish_properties, PubProps},
{publish_fields, PubFields},
{add_forward_headers, AFH},
@@ -62,50 +79,82 @@ convert_from_legacy(Config) ->
{reconnect_delay, RD}].
parse(ShovelName, Config0) ->
- case is_legacy(Config0) of
- true ->
- case parse_legacy(ShovelName, Config0) of
- {error, _} = Err -> Err;
- _ ->
- Config = convert_from_legacy(Config0),
- try_parse_current(ShovelName, Config)
- end;
- false ->
- try_parse_current(ShovelName, Config0)
- end.
-
-try_parse_current(ShovelName, Config) ->
try
- parse_current(ShovelName, Config)
+ validate(Config0),
+ case is_legacy(Config0) of
+ true ->
+ Config = convert_from_legacy(Config0),
+ parse_current(ShovelName, Config);
+ false ->
+ parse_current(ShovelName, Config0)
+ end
catch throw:{error, Reason} ->
- {error, {invalid_shovel_configuration, ShovelName, Reason}}
+ {error, {invalid_shovel_configuration, ShovelName, Reason}};
+ throw:Reason ->
+ {error, {invalid_shovel_configuration, ShovelName, Reason}}
end.
+validate(Props) ->
+ validate_proplist(Props),
+ validate_duplicates(Props).
+
+validate_proplist(Props) when is_list (Props) ->
+ case lists:filter(fun ({_, _}) -> false;
+ (_) -> true
+ end, Props) of
+ [] -> ok;
+ Invalid ->
+ throw({invalid_parameters, Invalid})
+ end;
+validate_proplist(X) ->
+ throw({require_list, X}).
+
+validate_duplicates(Props) ->
+ case duplicate_keys(Props) of
+ [] -> ok;
+ Invalid ->
+ throw({duplicate_parameters, Invalid})
+ end.
+
+validate_list(L) when is_list(L) -> ok;
+validate_list(L) ->
+ throw({require_list, L}).
+
+validate_uris(Key, L) when not is_list(L) ->
+ throw({require_list, Key, L});
+validate_uris(Key, []) ->
+ throw({expected_non_empty_list, Key});
+validate_uris(_Key, L) ->
+ validate_uris0(L).
+
+validate_uris0([Uri | Uris]) ->
+ case amqp_uri:parse(Uri) of
+ {ok, _Params} ->
+ validate_uris0(Uris);
+ {error, _} = Err ->
+ throw(Err)
+ end;
+validate_uris0([]) -> ok.
+
parse_current(ShovelName, Config) ->
{source, Source} = proplists:lookup(source, Config),
+ validate(Source),
SrcMod = resolve_module(proplists:get_value(protocol, Source, amqp091)),
{destination, Destination} = proplists:lookup(destination, Config),
+ validate(Destination),
DstMod = resolve_module(proplists:get_value(protocol, Destination, amqp091)),
+ AckMode = proplists:get_value(ack_mode, Config, no_ack),
+ validate_ack_mode(AckMode),
{ok, #{name => ShovelName,
shovel_type => static,
- ack_mode => proplists:get_value(ack_mode, Config, no_ack),
+ ack_mode => AckMode,
reconnect_delay => proplists:get_value(reconnect_delay, Config,
?DEFAULT_RECONNECT_DELAY),
source => rabbit_shovel_behaviour:parse(SrcMod, ShovelName,
- {source, Source}),
+ {source, Source}),
dest => rabbit_shovel_behaviour:parse(DstMod, ShovelName,
{destination, Destination})}}.
-parse_legacy(ShovelName, Config) ->
- {ok, Defaults} = application:get_env(defaults),
- try
- {ok, parse_shovel_config_dict(
- ShovelName, parse_shovel_config_proplist(
- enrich_shovel_config(Config, Defaults)))}
- catch throw:{error, Reason} ->
- {error, {invalid_shovel_configuration, ShovelName, Reason}}
- end.
-
%% ensures that any defaults that have been applied to a parsed
%% shovel, are written back to the original proplist
ensure_defaults(ShovelConfig, ParsedShovel) ->
@@ -114,128 +163,26 @@ ensure_defaults(ShovelConfig, ParsedShovel) ->
{reconnect_delay,
ParsedShovel#shovel.reconnect_delay}).
-enrich_shovel_config(Config, Defaults) ->
- Config1 = proplists:unfold(Config),
- case [E || E <- Config1, not (is_tuple(E) andalso tuple_size(E) == 2)] of
- [] -> case duplicate_keys(Config1) of
- [] ->
- return(lists:ukeysort(1, Config1 ++ Defaults));
- Dups ->
- fail({duplicate_parameters, Dups})
- end;
- Invalid -> fail({invalid_parameters, Invalid})
- end.
-
-parse_shovel_config_proplist(Config) ->
- Dict = dict:from_list(Config),
- Fields = record_info(fields, shovel) -- ?IGNORE_FIELDS,
- Keys = dict:fetch_keys(Dict) -- ?EXTRA_KEYS,
- case {Keys -- Fields, Fields -- Keys} of
- {[], []} -> {_Pos, Dict1} =
- lists:foldl(
- fun (FieldName, {Pos, Acc}) ->
- {Pos + 1,
- dict:update(FieldName,
- fun (V) -> {V, Pos} end,
- Acc)}
- end, {2, Dict}, Fields),
- return(Dict1);
- {[], Missing} -> fail({missing_parameters, Missing});
- {Unknown, _} -> fail({unrecognised_parameters, Unknown})
- end.
-
-parse_shovel_config_dict(_Name, Dict) ->
- Cfg = run_state_monad(
- [fun (Shovel) ->
- {ok, Value} = dict:find(Key, Dict),
- try {ParsedValue, Pos} = Fun(Value),
- return(setelement(Pos, Shovel, ParsedValue))
- catch throw:{error, Reason} ->
- fail({invalid_parameter_value, Key, Reason})
- end
- end || {Fun, Key} <-
- [{fun parse_endpoint/1, sources},
- {fun parse_endpoint/1, destinations},
- {fun parse_non_negative_integer/1, prefetch_count},
- {fun parse_ack_mode/1, ack_mode},
- {fun parse_binary/1, queue},
- {fun parse_non_negative_number/1, reconnect_delay}]],
- #shovel{}),
- Cfg.
-
-%% --=: Plain state monad implementation start :=--
-run_state_monad(FunList, State) ->
- lists:foldl(fun (Fun, StateN) -> Fun(StateN) end, State, FunList).
-
-return(V) -> V.
-
-spec fail(term()) -> no_return().
fail(Reason) -> throw({error, Reason}).
-%% --=: end :=--
-
-parse_endpoint({Endpoint, Pos}) when is_list(Endpoint) ->
- Brokers = case proplists:get_value(brokers, Endpoint) of
- undefined ->
- case proplists:get_value(broker, Endpoint) of
- undefined -> fail({missing_endpoint_parameter,
- broker_or_brokers});
- B -> [B]
- end;
- Bs when is_list(Bs) ->
- Bs;
- B ->
- fail({expected_list, brokers, B})
- end,
- {[], Brokers1} = run_state_monad(
- lists:duplicate(length(Brokers),
- fun check_uri/1),
- {Brokers, []}),
-
- case proplists:get_value(declarations, Endpoint, []) of
- Decls when is_list(Decls) ->
- ok;
- Decls ->
- fail({expected_list, declarations, Decls})
- end,
- return({#endpoint{uris = Brokers1},
- Pos});
-parse_endpoint({Endpoint, _Pos}) ->
- fail({require_list, Endpoint}).
-
-check_uri({[Uri | Uris], Acc}) ->
- case amqp_uri:parse(Uri) of
- {ok, _Params} ->
- return({Uris, [Uri | Acc]});
- {error, _} = Err ->
- throw(Err)
- end.
-
-parse_non_negative_integer({N, Pos}) when is_integer(N) andalso N >= 0 ->
- return({N, Pos});
-parse_non_negative_integer({N, _Pos}) ->
- fail({require_non_negative_integer, N}).
-parse_non_negative_number({N, Pos}) when is_number(N) andalso N >= 0 ->
- return({N, Pos});
-parse_non_negative_number({N, _Pos}) ->
- fail({require_non_negative_number, N}).
-
-parse_binary({Binary, Pos}) when is_binary(Binary) ->
- return({Binary, Pos});
-parse_binary({NotABinary, _Pos}) ->
- fail({require_binary, NotABinary}).
-
-parse_ack_mode({Val, Pos}) when Val =:= no_ack orelse
+validate_ack_mode(Val) when Val =:= no_ack orelse
Val =:= on_publish orelse
Val =:= on_confirm ->
- return({Val, Pos});
-parse_ack_mode({WrongVal, _Pos}) ->
- fail({ack_mode_value_requires_one_of, {no_ack, on_publish, on_confirm},
- WrongVal}).
+ ok;
+validate_ack_mode(WrongVal) ->
+ fail({invalid_parameter_value, ack_mode,
+ {ack_mode_value_requires_one_of, {no_ack, on_publish, on_confirm},
+ WrongVal}}).
duplicate_keys(PropList) when is_list(PropList) ->
proplists:get_keys(
lists:foldl(fun (K, L) -> lists:keydelete(K, 1, L) end, PropList,
proplists:get_keys(PropList))).
-% duplicate_keys(Fields) ->
-% fail({require_list, Fields}).
+
+get_value(Key, Props) ->
+ case proplists:get_value(Key, Props) of
+ undefined ->
+ throw({missing_parameter, Key});
+ V -> V
+ end.
diff --git a/deps/rabbitmq_shovel/test/configuration_SUITE.erl b/deps/rabbitmq_shovel/test/configuration_SUITE.erl
index bf161cb806..67b25ba292 100644
--- a/deps/rabbitmq_shovel/test/configuration_SUITE.erl
+++ b/deps/rabbitmq_shovel/test/configuration_SUITE.erl
@@ -119,20 +119,16 @@ invalid_legacy_configuration1(_Config) ->
{duplicate_parameters, [queue]} =
test_broken_shovel_config([{queue, <<"">>} | Config]),
- {missing_parameters, Missing} =
+ {missing_parameter, _} =
test_broken_shovel_config([]),
- [destinations, queue, sources] = lists:sort(Missing),
-
- {unrecognised_parameters, [invalid]} =
- test_broken_shovel_config([{invalid, invalid} | Config]),
{require_list, invalid} =
test_broken_shovel_sources(invalid),
- {missing_endpoint_parameter, broker_or_brokers} =
+ {missing_parameter, broker} =
test_broken_shovel_sources([]),
- {expected_list, brokers, invalid} =
+ {require_list, brokers, invalid} =
test_broken_shovel_sources([{brokers, invalid}]),
{expected_string_uri, 42} =
@@ -144,7 +140,7 @@ invalid_legacy_configuration1(_Config) ->
{{unable_to_parse_uri, no_scheme}, "invalid"} =
test_broken_shovel_sources([{broker, "invalid"}]),
- {expected_list,declarations, invalid} =
+ {require_list, invalid} =
test_broken_shovel_sources([{broker, "amqp://"},
{declarations, invalid}]),
{unknown_method_name, 42} =
@@ -210,11 +206,9 @@ test_broken_shovel_config(Config) ->
Error.
test_broken_shovel_sources(Sources) ->
- {invalid_parameter_value, sources, Error} =
- test_broken_shovel_config([{sources, Sources},
- {destinations, [{broker, "amqp://"}]},
- {queue, <<"">>}]),
- Error.
+ test_broken_shovel_config([{sources, Sources},
+ {destinations, [{broker, "amqp://"}]},
+ {queue, <<"">>}]).
valid_legacy_configuration(Config) ->
ok = setup_legacy_shovels(Config),
diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
index 84ef64b55f..52e9536ace 100644
--- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
+++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
@@ -232,7 +232,7 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}),
publish_count(Ch, <<>>, <<"src">>, <<"hello">>, 100),
- Ret = amqp_channel:wait_for_confirms(Ch),
+ amqp_channel:wait_for_confirms(Ch),
shovel_test_utils:set_param_nowait(
Config,
<<"test">>, [{<<"src-queue">>, <<"src">>},
diff --git a/deps/rabbitmq_shovel/test/parameters_SUITE.erl b/deps/rabbitmq_shovel/test/parameters_SUITE.erl
index 335cecf1f6..a4c786d2b1 100644
--- a/deps/rabbitmq_shovel/test/parameters_SUITE.erl
+++ b/deps/rabbitmq_shovel/test/parameters_SUITE.erl
@@ -128,9 +128,9 @@ test_parse_amqp091(Params) ->
#'P_basic'{headers = HeadersResult,
delivery_mode = 2,
- cluster_id = <<"x">>} = PBasic = PropsFun("amqp://localhost:5672",
- "amqp://remotehost:5672",
- #'P_basic'{headers = undefined}),
+ cluster_id = <<"x">>} = PropsFun("amqp://localhost:5672",
+ "amqp://remotehost:5672",
+ #'P_basic'{headers = undefined}),
{_, array, [{table, Shovelled}]} = lists:keyfind(<<"x-shovelled">>, 1, HeadersResult),
{_, long, _} = lists:keyfind(<<"x-shovelled-timestamp">>, 1, HeadersResult),