summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl')
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl520
1 files changed, 520 insertions, 0 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
new file mode 100644
index 0000000000..44fc12183c
--- /dev/null
+++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
@@ -0,0 +1,520 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_amqp091_shovel).
+
+-behaviour(rabbit_shovel_behaviour).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_shovel.hrl").
+
+-export([
+ parse/2,
+ source_uri/1,
+ dest_uri/1,
+ source_protocol/1,
+ dest_protocol/1,
+ source_endpoint/1,
+ dest_endpoint/1,
+ connect_source/1,
+ init_source/1,
+ connect_dest/1,
+ init_dest/1,
+ handle_source/2,
+ handle_dest/2,
+ close_source/1,
+ close_dest/1,
+ ack/3,
+ nack/3,
+ forward/4
+ ]).
+
+-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).
+
+parse(_Name, {source, Source}) ->
+ Prefetch = parse_parameter(prefetch_count, fun parse_non_negative_integer/1,
+ proplists:get_value(prefetch_count, Source,
+ ?DEFAULT_PREFETCH)),
+ Queue = parse_parameter(queue, fun parse_binary/1,
+ proplists:get_value(queue, Source)),
+ #{module => ?MODULE,
+ uris => proplists:get_value(uris, Source),
+ resource_decl => decl_fun(Source),
+ queue => Queue,
+ delete_after => proplists:get_value(delete_after, Source, never),
+ prefetch_count => Prefetch};
+parse(Name, {destination, Dest}) ->
+ PubProp = proplists:get_value(publish_properties, Dest, []),
+ PropsFun = try_make_parse_publish(publish_properties, PubProp),
+ PubFields = proplists:get_value(publish_fields, Dest, []),
+ PubFieldsFun = try_make_parse_publish(publish_fields, PubFields),
+ AFH = proplists:get_value(add_forward_headers, Dest, false),
+ ATH = proplists:get_value(add_timestamp_header, Dest, false),
+ PropsFun1 = add_forward_headers_fun(Name, AFH, PropsFun),
+ PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1),
+ #{module => ?MODULE,
+ uris => proplists:get_value(uris, Dest),
+ resource_decl => decl_fun(Dest),
+ props_fun => PropsFun2,
+ fields_fun => PubFieldsFun,
+ add_forward_headers => AFH,
+ add_timestamp_header => ATH}.
+
+connect_source(Conf = #{name := Name,
+ source := #{uris := Uris} = Src}) ->
+ {Conn, Chan, Uri} = make_conn_and_chan(Uris, Name),
+ Conf#{source => Src#{current => {Conn, Chan, Uri}}}.
+
+init_source(Conf = #{ack_mode := AckMode,
+ source := #{queue := Queue,
+ current := {Conn, Chan, _},
+ prefetch_count := Prefetch,
+ resource_decl := Decl} = Src}) ->
+ Decl(Conn, Chan),
+
+ NoAck = AckMode =:= no_ack,
+ case NoAck of
+ false ->
+ #'basic.qos_ok'{} =
+ amqp_channel:call(Chan, #'basic.qos'{prefetch_count = Prefetch}),
+ ok;
+ true -> ok
+ end,
+ Remaining = remaining(Chan, Conf),
+ case Remaining of
+ 0 ->
+ exit({shutdown, autodelete});
+ _ -> ok
+ end,
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Chan, #'basic.consume'{queue = Queue,
+ no_ack = NoAck}, self()),
+ Conf#{source => Src#{remaining => Remaining,
+ remaining_unacked => Remaining}}.
+
+connect_dest(Conf = #{name := Name, dest := #{uris := Uris} = Dst}) ->
+ {Conn, Chan, URI} = make_conn_and_chan(Uris, Name),
+ Conf#{dest => Dst#{current => {Conn, Chan, URI}}}.
+
+init_dest(Conf = #{ack_mode := AckMode,
+ dest := #{current := {Conn, Chan, _},
+ resource_decl := Decl} = Dst}) ->
+
+ Decl(Conn, Chan),
+
+ case AckMode of
+ on_confirm ->
+ #'confirm.select_ok'{} =
+ amqp_channel:call(Chan, #'confirm.select'{}),
+ ok = amqp_channel:register_confirm_handler(Chan, self());
+ _ ->
+ ok
+ end,
+ Conf#{dest => Dst#{unacked => #{}}}.
+
+ack(Tag, Multi, State = #{source := #{current := {_, Chan, _}}}) ->
+ ok = amqp_channel:cast(Chan, #'basic.ack'{delivery_tag = Tag,
+ multiple = Multi}),
+ State.
+
+nack(Tag, Multi, State = #{source := #{current := {_, Chan, _}}}) ->
+ ok = amqp_channel:cast(Chan, #'basic.nack'{delivery_tag = Tag,
+ multiple = Multi}),
+ State.
+
+source_uri(#{source := #{current := {_, _, Uri}}}) -> Uri.
+dest_uri(#{dest := #{current := {_, _, Uri}}}) -> Uri.
+
+source_protocol(_State) -> amqp091.
+dest_protocol(_State) -> amqp091.
+
+source_endpoint(#{source := #{queue := <<>>,
+ source_exchange := SrcX,
+ source_exchange_key := SrcXKey}}) ->
+ [{src_exchange, SrcX},
+ {src_exchange_key, SrcXKey}];
+source_endpoint(#{source := #{queue := Queue}}) ->
+ [{src_queue, Queue}];
+source_endpoint(_Config) ->
+ [].
+
+dest_endpoint(#{shovel_type := static}) ->
+ [];
+dest_endpoint(#{dest := Dest}) ->
+ Keys = [dest_exchange, dest_exchange_key, dest_queue],
+ maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)).
+
+forward(IncomingTag, Props, Payload,
+ State0 = #{dest := #{props_fun := PropsFun,
+ current := {_, _, DstUri},
+ fields_fun := FieldsFun}}) ->
+ SrcUri = rabbit_shovel_behaviour:source_uri(State0),
+ % do publish
+ Exchange = maps:get(exchange, Props, undefined),
+ RoutingKey = maps:get(routing_key, Props, undefined),
+ Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},
+ Method1 = FieldsFun(SrcUri, DstUri, Method),
+ Msg1 = #amqp_msg{props = PropsFun(SrcUri, DstUri, props_from_map(Props)),
+ payload = Payload},
+ publish(IncomingTag, Method1, Msg1, State0).
+
+props_from_map(Map) ->
+ #'P_basic'{content_type = maps:get(content_type, Map, undefined),
+ content_encoding = maps:get(content_encoding, Map, undefined),
+ headers = maps:get(headers, Map, undefined),
+ delivery_mode = maps:get(delivery_mode, Map, undefined),
+ priority = maps:get(priority, Map, undefined),
+ correlation_id = maps:get(correlation_id, Map, undefined),
+ reply_to = maps:get(reply_to, Map, undefined),
+ expiration = maps:get(expiration, Map, undefined),
+ message_id = maps:get(message_id, Map, undefined),
+ timestamp = maps:get(timestamp, Map, undefined),
+ type = maps:get(type, Map, undefined),
+ user_id = maps:get(user_id, Map, undefined),
+ app_id = maps:get(app_id, Map, undefined),
+ cluster_id = maps:get(cluster_id, Map, undefined)}.
+
+map_from_props(#'P_basic'{content_type = Content_type,
+ content_encoding = Content_encoding,
+ headers = Headers,
+ delivery_mode = Delivery_mode,
+ priority = Priority,
+ correlation_id = Correlation_id,
+ reply_to = Reply_to,
+ expiration = Expiration,
+ message_id = Message_id,
+ timestamp = Timestamp,
+ type = Type,
+ user_id = User_id,
+ app_id = App_id,
+ cluster_id = Cluster_id}) ->
+ lists:foldl(fun({_K, undefined}, Acc) -> Acc;
+ ({K, V}, Acc) -> Acc#{K => V}
+ end, #{}, [{content_type, Content_type},
+ {content_encoding, Content_encoding},
+ {headers, Headers},
+ {delivery_mode, Delivery_mode},
+ {priority, Priority},
+ {correlation_id, Correlation_id},
+ {reply_to, Reply_to},
+ {expiration, Expiration},
+ {message_id, Message_id},
+ {timestamp, Timestamp},
+ {type, Type},
+ {user_id, User_id},
+ {app_id, App_id},
+ {cluster_id, Cluster_id}
+ ]).
+
+handle_source(#'basic.consume_ok'{}, State) ->
+ State;
+handle_source({#'basic.deliver'{delivery_tag = Tag,
+ exchange = Exchange,
+ routing_key = RoutingKey},
+ #amqp_msg{props = Props0, payload = Payload}}, State) ->
+ Props = (map_from_props(Props0))#{exchange => Exchange,
+ routing_key => RoutingKey},
+ % forward to destination
+ rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
+
+handle_source({'EXIT', Conn, Reason},
+ #{source := #{current := {Conn, _, _}}}) ->
+ {stop, {inbound_conn_died, Reason}};
+
+handle_source(_Msg, _State) ->
+ not_handled.
+
+handle_dest(#'basic.ack'{delivery_tag = Seq, multiple = Multiple},
+ State = #{ack_mode := on_confirm}) ->
+ confirm_to_inbound(fun (Tag, Multi, StateX) ->
+ rabbit_shovel_behaviour:ack(Tag, Multi, StateX)
+ end, Seq, Multiple, State);
+
+handle_dest(#'basic.nack'{delivery_tag = Seq, multiple = Multiple},
+ State = #{ack_mode := on_confirm }) ->
+ confirm_to_inbound(fun (Tag, Multi, StateX) ->
+ rabbit_shovel_behaviour:nack(Tag, Multi, StateX)
+ end, Seq, Multiple, State);
+
+handle_dest(#'basic.cancel'{}, #{name := Name}) ->
+ rabbit_log:warning("Shovel ~p received a 'basic.cancel' from the server", [Name]),
+ {stop, {shutdown, restart}};
+
+handle_dest({'EXIT', Conn, Reason}, #{dest := #{current := {Conn, _, _}}}) ->
+ {stop, {outbound_conn_died, Reason}};
+
+handle_dest(_Msg, _State) ->
+ not_handled.
+
+close_source(#{source := #{current := {Conn, _, _}}}) ->
+ catch amqp_connection:close(Conn, ?MAX_CONNECTION_CLOSE_TIMEOUT),
+ ok;
+close_source(_) ->
+ %% It never connected, connection doesn't exist
+ ok.
+
+close_dest(#{dest := #{current := {Conn, _, _}}}) ->
+ catch amqp_connection:close(Conn, ?MAX_CONNECTION_CLOSE_TIMEOUT),
+ ok;
+close_dest(_) ->
+ %% It never connected, connection doesn't exist
+ ok.
+
+confirm_to_inbound(ConfirmFun, Seq, Multiple,
+ State0 = #{dest := #{unacked := Unacked} = Dst}) ->
+ #{Seq := InTag} = Unacked,
+ State = ConfirmFun(InTag, Multiple, State0),
+ {Unacked1, Removed} = remove_delivery_tags(Seq, Multiple, Unacked, 0),
+ rabbit_shovel_behaviour:decr_remaining(Removed,
+ State#{dest =>
+ Dst#{unacked => Unacked1}}).
+
+publish(_Tag, _Method, _Msg, State = #{source := #{remaining_unacked := 0}}) ->
+ %% We are in on-confirm mode, and are autodelete. We have
+ %% published all the messages we need to; we just wait for acks to
+ %% come back. So drop subsequent messages on the floor to be
+ %% requeued later.
+ State;
+
+publish(IncomingTag, Method, Msg,
+ State = #{ack_mode := AckMode,
+ dest := Dst}) ->
+ #{unacked := Unacked,
+ current := {_, OutboundChan, _}} = Dst,
+ Seq = case AckMode of
+ on_confirm ->
+ amqp_channel:next_publish_seqno(OutboundChan);
+ _ -> undefined
+ end,
+ ok = amqp_channel:call(OutboundChan, Method, Msg),
+ rabbit_shovel_behaviour:decr_remaining_unacked(
+ case AckMode of
+ no_ack ->
+ rabbit_shovel_behaviour:decr_remaining(1, State);
+ on_confirm ->
+ State#{dest => Dst#{unacked => Unacked#{Seq => IncomingTag}}};
+ on_publish ->
+ State1 = rabbit_shovel_behaviour:ack(IncomingTag, false, State),
+ rabbit_shovel_behaviour:decr_remaining(1, State1)
+ end).
+
+make_conn_and_chan([], {VHost, Name} = _ShovelName) ->
+ rabbit_log:error(
+ "Shovel '~s' in vhost '~s' has no more URIs to try for connection",
+ [Name, VHost]),
+ erlang:error(failed_to_connect_using_provided_uris);
+make_conn_and_chan([], ShovelName) ->
+ rabbit_log:error(
+ "Shovel '~s' has no more URIs to try for connection",
+ [ShovelName]),
+ erlang:error(failed_to_connect_using_provided_uris);
+make_conn_and_chan(URIs, ShovelName) ->
+ try do_make_conn_and_chan(URIs, ShovelName) of
+ Val -> Val
+ catch throw:{error, Reason, URI} ->
+ log_connection_failure(Reason, URI, ShovelName),
+ make_conn_and_chan(lists:usort(URIs -- [URI]), ShovelName)
+ end.
+
+do_make_conn_and_chan(URIs, ShovelName) ->
+ URI = lists:nth(rand:uniform(length(URIs)), URIs),
+ {ok, AmqpParam} = amqp_uri:parse(URI),
+ ConnName = get_connection_name(ShovelName),
+ case amqp_connection:start(AmqpParam, ConnName) of
+ {ok, Conn} ->
+ link(Conn),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ link(Ch),
+ {Conn, Ch, list_to_binary(amqp_uri:remove_credentials(URI))};
+ {error, not_allowed} ->
+ throw({error, not_allowed, URI});
+ {error, Reason} ->
+ throw({error, Reason, URI})
+ end.
+
+log_connection_failure(Reason, URI, {VHost, Name} = _ShovelName) ->
+ rabbit_log:error(
+ "Shovel '~s' in vhost '~s' failed to connect (URI: ~s): ~s~n",
+ [Name, VHost, amqp_uri:remove_credentials(URI), human_readable_connection_error(Reason)]);
+log_connection_failure(Reason, URI, ShovelName) ->
+ rabbit_log:error(
+ "Shovel '~s' failed to connect (URI: ~s): ~s~n",
+ [ShovelName, amqp_uri:remove_credentials(URI), human_readable_connection_error(Reason)]).
+
+human_readable_connection_error({auth_failure, Msg}) ->
+ Msg;
+human_readable_connection_error(not_allowed) ->
+ "access to target virtual host was refused";
+human_readable_connection_error(unknown_host) ->
+ "unknown host (failed to resolve hostname)";
+human_readable_connection_error(econnrefused) ->
+ "connection to target host was refused (ECONNREFUSED)";
+human_readable_connection_error(econnreset) ->
+ "connection to target host was reset by peer (ECONNRESET)";
+human_readable_connection_error(etimedout) ->
+ "connection to target host timed out (ETIMEDOUT)";
+human_readable_connection_error(ehostunreach) ->
+ "target host is unreachable (EHOSTUNREACH)";
+human_readable_connection_error(nxdomain) ->
+ "target hostname cannot be resolved (NXDOMAIN)";
+human_readable_connection_error(eacces) ->
+ "connection to target host failed with EACCES. "
+ "This may be due to insufficient RabbitMQ process permissions or "
+ "a reserved IP address used as destination";
+human_readable_connection_error(Other) ->
+ rabbit_misc:format("~p", [Other]).
+
+get_connection_name(ShovelName) when is_atom(ShovelName) ->
+ Prefix = <<"Shovel ">>,
+ ShovelNameAsBinary = atom_to_binary(ShovelName, utf8),
+ <<Prefix/binary, ShovelNameAsBinary/binary>>;
+%% for dynamic shovels, name is a binary
+get_connection_name(ShovelName) when is_binary(ShovelName) ->
+ Prefix = <<"Shovel ">>,
+ <<Prefix/binary, ShovelName/binary>>;
+%% fallback
+get_connection_name(_) ->
+ <<"Shovel">>.
+
+remove_delivery_tags(Seq, false, Unacked, 0) ->
+ {maps:remove(Seq, Unacked), 1};
+remove_delivery_tags(Seq, true, Unacked, Count) ->
+ case maps:size(Unacked) of
+ 0 -> {Unacked, Count};
+ _ ->
+ maps:fold(fun(K, _V, {Acc, Cnt}) when K =< Seq ->
+ {maps:remove(K, Acc), Cnt + 1};
+ (_K, _V, Acc) -> Acc
+ end, {Unacked, 0}, Unacked)
+ end.
+
+remaining(_Ch, #{source := #{delete_after := never}}) ->
+ unlimited;
+remaining(Ch, #{source := #{delete_after := 'queue-length',
+ queue := Queue}}) ->
+ #'queue.declare_ok'{message_count = N} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
+ passive = true}),
+ N;
+remaining(_Ch, #{source := #{delete_after := Count}}) ->
+ Count.
+
+%%% PARSING
+
+try_make_parse_publish(Key, Fields) ->
+ make_parse_publish(Key, Fields).
+
+make_parse_publish(publish_fields, Fields) ->
+ make_publish_fun(Fields, record_info(fields, 'basic.publish'));
+make_parse_publish(publish_properties, Fields) ->
+ make_publish_fun(Fields, record_info(fields, 'P_basic')).
+
+make_publish_fun(Fields, ValidFields) when is_list(Fields) ->
+ SuppliedFields = proplists:get_keys(Fields),
+ case SuppliedFields -- ValidFields of
+ [] ->
+ FieldIndices = make_field_indices(ValidFields, Fields),
+ fun (_SrcUri, _DestUri, Publish) ->
+ lists:foldl(fun ({Pos1, Value}, Pub) ->
+ setelement(Pos1, Pub, Value)
+ end, Publish, FieldIndices)
+ end;
+ Unexpected ->
+ fail({invalid_parameter_value, publish_properties,
+ {unexpected_fields, Unexpected, ValidFields}})
+ end;
+make_publish_fun(Fields, _) ->
+ fail({invalid_parameter_value, publish_properties,
+ {require_list, Fields}}).
+
+make_field_indices(Valid, Fields) ->
+ make_field_indices(Fields, field_map(Valid, 2), []).
+
+make_field_indices([], _Idxs , Acc) ->
+ lists:reverse(Acc);
+make_field_indices([{Key, Value} | Rest], Idxs, Acc) ->
+ make_field_indices(Rest, Idxs, [{dict:fetch(Key, Idxs), Value} | Acc]).
+
+field_map(Fields, Idx0) ->
+ {Dict, _IdxMax} =
+ lists:foldl(fun (Field, {Dict1, Idx1}) ->
+ {dict:store(Field, Idx1, Dict1), Idx1 + 1}
+ end, {dict:new(), Idx0}, Fields),
+ Dict.
+
+-spec fail(term()) -> no_return().
+fail(Reason) -> throw({error, Reason}).
+
+add_forward_headers_fun(Name, true, PubProps) ->
+ fun(SrcUri, DestUri, Props) ->
+ rabbit_shovel_util:update_headers(
+ [{<<"shovelled-by">>, rabbit_nodes:cluster_name()},
+ {<<"shovel-type">>, <<"static">>},
+ {<<"shovel-name">>, list_to_binary(atom_to_list(Name))}],
+ [], SrcUri, DestUri, PubProps(SrcUri, DestUri, Props))
+ end;
+add_forward_headers_fun(_Name, false, PubProps) ->
+ PubProps.
+
+add_timestamp_header_fun(true, PubProps) ->
+ fun(SrcUri, DestUri, Props) ->
+ rabbit_shovel_util:add_timestamp_header(
+ PubProps(SrcUri, DestUri, Props))
+ end;
+add_timestamp_header_fun(false, PubProps) -> PubProps.
+
+parse_declaration({[], Acc}) ->
+ Acc;
+parse_declaration({[{Method, Props} | Rest], Acc}) when is_list(Props) ->
+ FieldNames = try rabbit_framing_amqp_0_9_1:method_fieldnames(Method)
+ catch exit:Reason -> fail(Reason)
+ end,
+ case proplists:get_keys(Props) -- FieldNames of
+ [] -> ok;
+ UnknownFields -> fail({unknown_fields, Method, UnknownFields})
+ end,
+ {Res, _Idx} = lists:foldl(
+ fun (K, {R, Idx}) ->
+ NewR = case proplists:get_value(K, Props) of
+ undefined -> R;
+ V -> setelement(Idx, R, V)
+ end,
+ {NewR, Idx + 1}
+ end, {rabbit_framing_amqp_0_9_1:method_record(Method), 2},
+ FieldNames),
+ parse_declaration({Rest, [Res | Acc]});
+parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
+ fail({expected_method_field_list, Method, Props});
+parse_declaration({[Method | Rest], Acc}) ->
+ parse_declaration({[{Method, []} | Rest], Acc}).
+
+decl_fun(Endpoint) ->
+ Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []),
+ []}),
+ fun (_Conn, Ch) ->
+ [begin
+ amqp_channel:call(Ch, M)
+ end || M <- lists:reverse(Decl)]
+ end.
+
+parse_parameter(Param, Fun, Value) ->
+ try
+ Fun(Value)
+ catch
+ _:{error, Err} ->
+ fail({invalid_parameter_value, Param, Err})
+ end.
+
+parse_non_negative_integer(N) when is_integer(N) andalso N >= 0 ->
+ N;
+parse_non_negative_integer(N) ->
+ fail({require_non_negative_integer, N}).
+
+parse_binary(Binary) when is_binary(Binary) ->
+ Binary;
+parse_binary(NotABinary) ->
+ fail({require_binary, NotABinary}).