diff options
author | kjnilsson <knilsson@pivotal.io> | 2017-05-22 11:24:17 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2017-05-22 11:24:17 +0100 |
commit | bbce6610137befeb573bb9588086e55fc4432d07 (patch) | |
tree | ec8b183fb2c102749aab02a7ad5b9fb676994aa9 | |
parent | d1eb01a68d5844139194227b7c82b403c48856f1 (diff) | |
download | rabbitmq-server-git-bbce6610137befeb573bb9588086e55fc4432d07.tar.gz |
Refactor configuration validation
To get better separation between validation
and parsing and also move some of the protocol
specific parsing into the relevant shovel
implementation.
[#138600475]
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl | 61 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_shovel_config.erl | 245 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/test/configuration_SUITE.erl | 20 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/test/dynamic_SUITE.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/test/parameters_SUITE.erl | 6 |
5 files changed, 145 insertions, 189 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index 2ab6e197c9..4b3fb9eeec 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -41,13 +41,16 @@ -define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000). parse(_Name, {source, Source}) -> + Prefetch = parse_parameter(prefetch_count, fun parse_non_negative_integer/1, + proplists:get_value(prefetch_count, Source, ?DEFAULT_PREFETCH)), + Queue = parse_parameter(queue, fun parse_binary/1, + proplists:get_value(queue, Source)), #{module => ?MODULE, uris => proplists:get_value(uris, Source), - resource_decl => decl_fun(sources, Source), - queue => proplists:get_value(queue, Source), + resource_decl => decl_fun(Source), + queue => Queue, delete_after => proplists:get_value(delete_after, Source, never), - prefetch_count => proplists:get_value(prefetch_count, Source, - ?DEFAULT_PREFETCH)}; + prefetch_count => Prefetch}; parse(Name, {destination, Dest}) -> PubProp = proplists:get_value(publish_properties, Dest, []), PropsFun = try_make_parse_publish(publish_properties, PubProp), @@ -59,7 +62,7 @@ parse(Name, {destination, Dest}) -> PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1), #{module => ?MODULE, uris => proplists:get_value(uris, Dest), - resource_decl => decl_fun(destinations, Dest), + resource_decl => decl_fun(Dest), props_fun => PropsFun2, fields_fun => PubFieldsFun, add_forward_headers => AFH, @@ -326,11 +329,7 @@ remaining(_Ch, #{source := #{delete_after := Count}}) -> %%% PARSING try_make_parse_publish(Key, Fields) -> - try make_parse_publish(Key, Fields) - catch - throw:{error, Reason} -> - fail({invalid_parameter_value, Key, Reason}) - end. + make_parse_publish(Key, Fields). make_parse_publish(publish_fields, Fields) -> make_publish_fun(Fields, record_info(fields, 'basic.publish')); @@ -348,10 +347,12 @@ make_publish_fun(Fields, ValidFields) when is_list(Fields) -> end, Publish, FieldIndices) end; Unexpected -> - fail({unexpected_fields, Unexpected, ValidFields}) + fail({invalid_parameter_value, publish_properties, + {unexpected_fields, Unexpected, ValidFields}}) end; make_publish_fun(Fields, _) -> - fail({require_list, Fields}). + fail({invalid_parameter_value, publish_properties, + {require_list, Fields}}). make_field_indices(Valid, Fields) -> make_field_indices(Fields, field_map(Valid, 2), []). @@ -414,15 +415,29 @@ parse_declaration({[{Method, Props} | _Rest], _Acc}) -> parse_declaration({[Method | Rest], Acc}) -> parse_declaration({[{Method, []} | Rest], Acc}). -decl_fun(Key, Endpoint) -> - try Decl = parse_declaration( - {proplists:get_value(declarations, Endpoint, []), []}), - fun (_Conn, Ch) -> - [begin - amqp_channel:call(Ch, M) - end || M <- lists:reverse(Decl)] - end - catch throw:{error, Reason} -> - fail({invalid_parameter_value, Key, Reason}) - end. +decl_fun(Endpoint) -> + Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []), + []}), + fun (_Conn, Ch) -> + [begin + amqp_channel:call(Ch, M) + end || M <- lists:reverse(Decl)] + end. + +parse_parameter(Param, Fun, Value) -> + try + Fun(Value) + catch + _:{error, Err} -> + fail({invalid_parameter_value, Param, Err}) + end. + +parse_non_negative_integer(N) when is_integer(N) andalso N >= 0 -> + N; +parse_non_negative_integer(N) -> + fail({require_non_negative_integer, N}). +parse_binary(Binary) when is_binary(Binary) -> + Binary; +parse_binary(NotABinary) -> + fail({require_binary, NotABinary}). diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl index 121dc87aa3..8dbeea6f57 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl @@ -31,29 +31,46 @@ resolve_module(amqp10) -> rabbit_amqp10_shovel. is_legacy(Config) -> not proplists:is_defined(source, Config). +get_brokers(Props) -> + case proplists:get_value(brokers, Props) of + undefined -> + [get_value(broker, Props)]; + Brokers -> + Brokers + end. + convert_from_legacy(Config) -> - S = proplists:get_value(sources, Config), - SUris = proplists:get_value(brokers, S, [proplists:get_value(broker, S)]), - D = proplists:get_value(destinations, Config), - DUris = proplists:get_value(brokers, D, [proplists:get_value(broker, D)]), - Q = proplists:get_value(queue, Config), + S = get_value(sources, Config), + validate(S), + SUris = get_brokers(S), + validate_uris(brokers, SUris), + D = get_value(destinations, Config), + validate(D), + DUris = get_brokers(D), + validate_uris(brokers, DUris), + Q = get_value(queue, Config), DA = proplists:get_value(delete_after, Config, never), Pref = proplists:get_value(prefetch_count, Config, ?DEFAULT_PREFETCH), RD = proplists:get_value(reconnect_delay, Config, ?DEFAULT_RECONNECT_DELAY), AckMode = proplists:get_value(ack_mode, Config, ?DEFAULT_ACK_MODE), + validate_ack_mode(AckMode), PubFields = proplists:get_value(publish_fields, Config, []), PubProps = proplists:get_value(publish_properties, Config, []), AFH = proplists:get_value(add_forward_headers, Config, false), ATH = proplists:get_value(add_timestamp_header, Config, false), + SourceDecls = proplists:get_value(declarations, S, []), + validate_list(SourceDecls), + DestDecls = proplists:get_value(declarations, D, []), + validate_list(DestDecls), [{source, [{protocol, amqp091}, {uris, SUris}, - {declarations, proplists:get_value(declarations, S, [])}, + {declarations, SourceDecls}, {queue, Q}, {delete_after, DA}, {prefetch_count, Pref}]}, {destination, [{protocol, amqp091}, {uris, DUris}, - {declarations, proplists:get_value(declarations, D, [])}, + {declarations, DestDecls}, {publish_properties, PubProps}, {publish_fields, PubFields}, {add_forward_headers, AFH}, @@ -62,50 +79,82 @@ convert_from_legacy(Config) -> {reconnect_delay, RD}]. parse(ShovelName, Config0) -> - case is_legacy(Config0) of - true -> - case parse_legacy(ShovelName, Config0) of - {error, _} = Err -> Err; - _ -> - Config = convert_from_legacy(Config0), - try_parse_current(ShovelName, Config) - end; - false -> - try_parse_current(ShovelName, Config0) - end. - -try_parse_current(ShovelName, Config) -> try - parse_current(ShovelName, Config) + validate(Config0), + case is_legacy(Config0) of + true -> + Config = convert_from_legacy(Config0), + parse_current(ShovelName, Config); + false -> + parse_current(ShovelName, Config0) + end catch throw:{error, Reason} -> - {error, {invalid_shovel_configuration, ShovelName, Reason}} + {error, {invalid_shovel_configuration, ShovelName, Reason}}; + throw:Reason -> + {error, {invalid_shovel_configuration, ShovelName, Reason}} end. +validate(Props) -> + validate_proplist(Props), + validate_duplicates(Props). + +validate_proplist(Props) when is_list (Props) -> + case lists:filter(fun ({_, _}) -> false; + (_) -> true + end, Props) of + [] -> ok; + Invalid -> + throw({invalid_parameters, Invalid}) + end; +validate_proplist(X) -> + throw({require_list, X}). + +validate_duplicates(Props) -> + case duplicate_keys(Props) of + [] -> ok; + Invalid -> + throw({duplicate_parameters, Invalid}) + end. + +validate_list(L) when is_list(L) -> ok; +validate_list(L) -> + throw({require_list, L}). + +validate_uris(Key, L) when not is_list(L) -> + throw({require_list, Key, L}); +validate_uris(Key, []) -> + throw({expected_non_empty_list, Key}); +validate_uris(_Key, L) -> + validate_uris0(L). + +validate_uris0([Uri | Uris]) -> + case amqp_uri:parse(Uri) of + {ok, _Params} -> + validate_uris0(Uris); + {error, _} = Err -> + throw(Err) + end; +validate_uris0([]) -> ok. + parse_current(ShovelName, Config) -> {source, Source} = proplists:lookup(source, Config), + validate(Source), SrcMod = resolve_module(proplists:get_value(protocol, Source, amqp091)), {destination, Destination} = proplists:lookup(destination, Config), + validate(Destination), DstMod = resolve_module(proplists:get_value(protocol, Destination, amqp091)), + AckMode = proplists:get_value(ack_mode, Config, no_ack), + validate_ack_mode(AckMode), {ok, #{name => ShovelName, shovel_type => static, - ack_mode => proplists:get_value(ack_mode, Config, no_ack), + ack_mode => AckMode, reconnect_delay => proplists:get_value(reconnect_delay, Config, ?DEFAULT_RECONNECT_DELAY), source => rabbit_shovel_behaviour:parse(SrcMod, ShovelName, - {source, Source}), + {source, Source}), dest => rabbit_shovel_behaviour:parse(DstMod, ShovelName, {destination, Destination})}}. -parse_legacy(ShovelName, Config) -> - {ok, Defaults} = application:get_env(defaults), - try - {ok, parse_shovel_config_dict( - ShovelName, parse_shovel_config_proplist( - enrich_shovel_config(Config, Defaults)))} - catch throw:{error, Reason} -> - {error, {invalid_shovel_configuration, ShovelName, Reason}} - end. - %% ensures that any defaults that have been applied to a parsed %% shovel, are written back to the original proplist ensure_defaults(ShovelConfig, ParsedShovel) -> @@ -114,128 +163,26 @@ ensure_defaults(ShovelConfig, ParsedShovel) -> {reconnect_delay, ParsedShovel#shovel.reconnect_delay}). -enrich_shovel_config(Config, Defaults) -> - Config1 = proplists:unfold(Config), - case [E || E <- Config1, not (is_tuple(E) andalso tuple_size(E) == 2)] of - [] -> case duplicate_keys(Config1) of - [] -> - return(lists:ukeysort(1, Config1 ++ Defaults)); - Dups -> - fail({duplicate_parameters, Dups}) - end; - Invalid -> fail({invalid_parameters, Invalid}) - end. - -parse_shovel_config_proplist(Config) -> - Dict = dict:from_list(Config), - Fields = record_info(fields, shovel) -- ?IGNORE_FIELDS, - Keys = dict:fetch_keys(Dict) -- ?EXTRA_KEYS, - case {Keys -- Fields, Fields -- Keys} of - {[], []} -> {_Pos, Dict1} = - lists:foldl( - fun (FieldName, {Pos, Acc}) -> - {Pos + 1, - dict:update(FieldName, - fun (V) -> {V, Pos} end, - Acc)} - end, {2, Dict}, Fields), - return(Dict1); - {[], Missing} -> fail({missing_parameters, Missing}); - {Unknown, _} -> fail({unrecognised_parameters, Unknown}) - end. - -parse_shovel_config_dict(_Name, Dict) -> - Cfg = run_state_monad( - [fun (Shovel) -> - {ok, Value} = dict:find(Key, Dict), - try {ParsedValue, Pos} = Fun(Value), - return(setelement(Pos, Shovel, ParsedValue)) - catch throw:{error, Reason} -> - fail({invalid_parameter_value, Key, Reason}) - end - end || {Fun, Key} <- - [{fun parse_endpoint/1, sources}, - {fun parse_endpoint/1, destinations}, - {fun parse_non_negative_integer/1, prefetch_count}, - {fun parse_ack_mode/1, ack_mode}, - {fun parse_binary/1, queue}, - {fun parse_non_negative_number/1, reconnect_delay}]], - #shovel{}), - Cfg. - -%% --=: Plain state monad implementation start :=-- -run_state_monad(FunList, State) -> - lists:foldl(fun (Fun, StateN) -> Fun(StateN) end, State, FunList). - -return(V) -> V. - -spec fail(term()) -> no_return(). fail(Reason) -> throw({error, Reason}). -%% --=: end :=-- - -parse_endpoint({Endpoint, Pos}) when is_list(Endpoint) -> - Brokers = case proplists:get_value(brokers, Endpoint) of - undefined -> - case proplists:get_value(broker, Endpoint) of - undefined -> fail({missing_endpoint_parameter, - broker_or_brokers}); - B -> [B] - end; - Bs when is_list(Bs) -> - Bs; - B -> - fail({expected_list, brokers, B}) - end, - {[], Brokers1} = run_state_monad( - lists:duplicate(length(Brokers), - fun check_uri/1), - {Brokers, []}), - - case proplists:get_value(declarations, Endpoint, []) of - Decls when is_list(Decls) -> - ok; - Decls -> - fail({expected_list, declarations, Decls}) - end, - return({#endpoint{uris = Brokers1}, - Pos}); -parse_endpoint({Endpoint, _Pos}) -> - fail({require_list, Endpoint}). - -check_uri({[Uri | Uris], Acc}) -> - case amqp_uri:parse(Uri) of - {ok, _Params} -> - return({Uris, [Uri | Acc]}); - {error, _} = Err -> - throw(Err) - end. - -parse_non_negative_integer({N, Pos}) when is_integer(N) andalso N >= 0 -> - return({N, Pos}); -parse_non_negative_integer({N, _Pos}) -> - fail({require_non_negative_integer, N}). -parse_non_negative_number({N, Pos}) when is_number(N) andalso N >= 0 -> - return({N, Pos}); -parse_non_negative_number({N, _Pos}) -> - fail({require_non_negative_number, N}). - -parse_binary({Binary, Pos}) when is_binary(Binary) -> - return({Binary, Pos}); -parse_binary({NotABinary, _Pos}) -> - fail({require_binary, NotABinary}). - -parse_ack_mode({Val, Pos}) when Val =:= no_ack orelse +validate_ack_mode(Val) when Val =:= no_ack orelse Val =:= on_publish orelse Val =:= on_confirm -> - return({Val, Pos}); -parse_ack_mode({WrongVal, _Pos}) -> - fail({ack_mode_value_requires_one_of, {no_ack, on_publish, on_confirm}, - WrongVal}). + ok; +validate_ack_mode(WrongVal) -> + fail({invalid_parameter_value, ack_mode, + {ack_mode_value_requires_one_of, {no_ack, on_publish, on_confirm}, + WrongVal}}). duplicate_keys(PropList) when is_list(PropList) -> proplists:get_keys( lists:foldl(fun (K, L) -> lists:keydelete(K, 1, L) end, PropList, proplists:get_keys(PropList))). -% duplicate_keys(Fields) -> -% fail({require_list, Fields}). + +get_value(Key, Props) -> + case proplists:get_value(Key, Props) of + undefined -> + throw({missing_parameter, Key}); + V -> V + end. diff --git a/deps/rabbitmq_shovel/test/configuration_SUITE.erl b/deps/rabbitmq_shovel/test/configuration_SUITE.erl index bf161cb806..67b25ba292 100644 --- a/deps/rabbitmq_shovel/test/configuration_SUITE.erl +++ b/deps/rabbitmq_shovel/test/configuration_SUITE.erl @@ -119,20 +119,16 @@ invalid_legacy_configuration1(_Config) -> {duplicate_parameters, [queue]} = test_broken_shovel_config([{queue, <<"">>} | Config]), - {missing_parameters, Missing} = + {missing_parameter, _} = test_broken_shovel_config([]), - [destinations, queue, sources] = lists:sort(Missing), - - {unrecognised_parameters, [invalid]} = - test_broken_shovel_config([{invalid, invalid} | Config]), {require_list, invalid} = test_broken_shovel_sources(invalid), - {missing_endpoint_parameter, broker_or_brokers} = + {missing_parameter, broker} = test_broken_shovel_sources([]), - {expected_list, brokers, invalid} = + {require_list, brokers, invalid} = test_broken_shovel_sources([{brokers, invalid}]), {expected_string_uri, 42} = @@ -144,7 +140,7 @@ invalid_legacy_configuration1(_Config) -> {{unable_to_parse_uri, no_scheme}, "invalid"} = test_broken_shovel_sources([{broker, "invalid"}]), - {expected_list,declarations, invalid} = + {require_list, invalid} = test_broken_shovel_sources([{broker, "amqp://"}, {declarations, invalid}]), {unknown_method_name, 42} = @@ -210,11 +206,9 @@ test_broken_shovel_config(Config) -> Error. test_broken_shovel_sources(Sources) -> - {invalid_parameter_value, sources, Error} = - test_broken_shovel_config([{sources, Sources}, - {destinations, [{broker, "amqp://"}]}, - {queue, <<"">>}]), - Error. + test_broken_shovel_config([{sources, Sources}, + {destinations, [{broker, "amqp://"}]}, + {queue, <<"">>}]). valid_legacy_configuration(Config) -> ok = setup_legacy_shovels(Config), diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index 84ef64b55f..52e9536ace 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -232,7 +232,7 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}), publish_count(Ch, <<>>, <<"src">>, <<"hello">>, 100), - Ret = amqp_channel:wait_for_confirms(Ch), + amqp_channel:wait_for_confirms(Ch), shovel_test_utils:set_param_nowait( Config, <<"test">>, [{<<"src-queue">>, <<"src">>}, diff --git a/deps/rabbitmq_shovel/test/parameters_SUITE.erl b/deps/rabbitmq_shovel/test/parameters_SUITE.erl index 335cecf1f6..a4c786d2b1 100644 --- a/deps/rabbitmq_shovel/test/parameters_SUITE.erl +++ b/deps/rabbitmq_shovel/test/parameters_SUITE.erl @@ -128,9 +128,9 @@ test_parse_amqp091(Params) -> #'P_basic'{headers = HeadersResult, delivery_mode = 2, - cluster_id = <<"x">>} = PBasic = PropsFun("amqp://localhost:5672", - "amqp://remotehost:5672", - #'P_basic'{headers = undefined}), + cluster_id = <<"x">>} = PropsFun("amqp://localhost:5672", + "amqp://remotehost:5672", + #'P_basic'{headers = undefined}), {_, array, [{table, Shovelled}]} = lists:keyfind(<<"x-shovelled">>, 1, HeadersResult), {_, long, _} = lists:keyfind(<<"x-shovelled-timestamp">>, 1, HeadersResult), |