summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_shovel/src/rabbit_shovel_config.erl')
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_shovel_config.erl176
1 files changed, 176 insertions, 0 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
new file mode 100644
index 0000000000..06678177d7
--- /dev/null
+++ b/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
@@ -0,0 +1,176 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_shovel_config).
+
+-export([parse/2,
+ ensure_defaults/2]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_shovel.hrl").
+
+resolve_module(amqp091) -> rabbit_amqp091_shovel;
+resolve_module(amqp10) -> rabbit_amqp10_shovel.
+
+is_legacy(Config) ->
+ not proplists:is_defined(source, Config).
+
+get_brokers(Props) ->
+ case proplists:get_value(brokers, Props) of
+ undefined ->
+ [get_value(broker, Props)];
+ Brokers ->
+ Brokers
+ end.
+
+convert_from_legacy(Config) ->
+ S = get_value(sources, Config),
+ validate(S),
+ SUris = get_brokers(S),
+ validate_uris(brokers, SUris),
+ D = get_value(destinations, Config),
+ validate(D),
+ DUris = get_brokers(D),
+ validate_uris(brokers, DUris),
+ Q = get_value(queue, Config),
+ DA = proplists:get_value(delete_after, Config, never),
+ Pref = proplists:get_value(prefetch_count, Config, ?DEFAULT_PREFETCH),
+ RD = proplists:get_value(reconnect_delay, Config, ?DEFAULT_RECONNECT_DELAY),
+ AckMode = proplists:get_value(ack_mode, Config, ?DEFAULT_ACK_MODE),
+ validate_ack_mode(AckMode),
+ PubFields = proplists:get_value(publish_fields, Config, []),
+ PubProps = proplists:get_value(publish_properties, Config, []),
+ AFH = proplists:get_value(add_forward_headers, Config, false),
+ ATH = proplists:get_value(add_timestamp_header, Config, false),
+ SourceDecls = proplists:get_value(declarations, S, []),
+ validate_list(SourceDecls),
+ DestDecls = proplists:get_value(declarations, D, []),
+ validate_list(DestDecls),
+ [{source, [{protocol, amqp091},
+ {uris, SUris},
+ {declarations, SourceDecls},
+ {queue, Q},
+ {delete_after, DA},
+ {prefetch_count, Pref}]},
+ {destination, [{protocol, amqp091},
+ {uris, DUris},
+ {declarations, DestDecls},
+ {publish_properties, PubProps},
+ {publish_fields, PubFields},
+ {add_forward_headers, AFH},
+ {add_timestamp_header, ATH}]},
+ {ack_mode, AckMode},
+ {reconnect_delay, RD}].
+
+parse(ShovelName, Config0) ->
+ try
+ validate(Config0),
+ case is_legacy(Config0) of
+ true ->
+ Config = convert_from_legacy(Config0),
+ parse_current(ShovelName, Config);
+ false ->
+ parse_current(ShovelName, Config0)
+ end
+ catch throw:{error, Reason} ->
+ {error, {invalid_shovel_configuration, ShovelName, Reason}};
+ throw:Reason ->
+ {error, {invalid_shovel_configuration, ShovelName, Reason}}
+ end.
+
+validate(Props) ->
+ validate_proplist(Props),
+ validate_duplicates(Props).
+
+validate_proplist(Props) when is_list (Props) ->
+ case lists:filter(fun ({_, _}) -> false;
+ (_) -> true
+ end, Props) of
+ [] -> ok;
+ Invalid ->
+ throw({invalid_parameters, Invalid})
+ end;
+validate_proplist(X) ->
+ throw({require_list, X}).
+
+validate_duplicates(Props) ->
+ case duplicate_keys(Props) of
+ [] -> ok;
+ Invalid ->
+ throw({duplicate_parameters, Invalid})
+ end.
+
+validate_list(L) when is_list(L) -> ok;
+validate_list(L) ->
+ throw({require_list, L}).
+
+validate_uris(Key, L) when not is_list(L) ->
+ throw({require_list, Key, L});
+validate_uris(Key, []) ->
+ throw({expected_non_empty_list, Key});
+validate_uris(_Key, L) ->
+ validate_uris0(L).
+
+validate_uris0([Uri | Uris]) ->
+ case amqp_uri:parse(Uri) of
+ {ok, _Params} ->
+ validate_uris0(Uris);
+ {error, _} = Err ->
+ throw(Err)
+ end;
+validate_uris0([]) -> ok.
+
+parse_current(ShovelName, Config) ->
+ {source, Source} = proplists:lookup(source, Config),
+ validate(Source),
+ SrcMod = resolve_module(proplists:get_value(protocol, Source, amqp091)),
+ {destination, Destination} = proplists:lookup(destination, Config),
+ validate(Destination),
+ DstMod = resolve_module(proplists:get_value(protocol, Destination, amqp091)),
+ AckMode = proplists:get_value(ack_mode, Config, no_ack),
+ validate_ack_mode(AckMode),
+ {ok, #{name => ShovelName,
+ shovel_type => static,
+ ack_mode => AckMode,
+ reconnect_delay => proplists:get_value(reconnect_delay, Config,
+ ?DEFAULT_RECONNECT_DELAY),
+ source => rabbit_shovel_behaviour:parse(SrcMod, ShovelName,
+ {source, Source}),
+ dest => rabbit_shovel_behaviour:parse(DstMod, ShovelName,
+ {destination, Destination})}}.
+
+%% ensures that any defaults that have been applied to a parsed
+%% shovel, are written back to the original proplist
+ensure_defaults(ShovelConfig, ParsedShovel) ->
+ lists:keystore(reconnect_delay, 1,
+ ShovelConfig,
+ {reconnect_delay,
+ ParsedShovel#shovel.reconnect_delay}).
+
+-spec fail(term()) -> no_return().
+fail(Reason) -> throw({error, Reason}).
+
+validate_ack_mode(Val) when Val =:= no_ack orelse
+ Val =:= on_publish orelse
+ Val =:= on_confirm ->
+ ok;
+validate_ack_mode(WrongVal) ->
+ fail({invalid_parameter_value, ack_mode,
+ {ack_mode_value_requires_one_of, {no_ack, on_publish, on_confirm},
+ WrongVal}}).
+
+duplicate_keys(PropList) when is_list(PropList) ->
+ proplists:get_keys(
+ lists:foldl(fun (K, L) -> lists:keydelete(K, 1, L) end, PropList,
+ proplists:get_keys(PropList))).
+
+get_value(Key, Props) ->
+ case proplists:get_value(Key, Props) of
+ undefined ->
+ throw({missing_parameter, Key});
+ V -> V
+ end.