diff options
Diffstat (limited to 'deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl')
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl new file mode 100644 index 0000000000..919db25910 --- /dev/null +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl @@ -0,0 +1,224 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_shovel_worker). +-behaviour(gen_server2). + +-export([start_link/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +%% for testing purposes +-export([get_connection_name/1]). + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_shovel.hrl"). + +-record(state, {inbound_conn, inbound_ch, outbound_conn, outbound_ch, + name, type, config, inbound_uri, outbound_uri, unacked, + remaining, %% [1] + remaining_unacked}). %% [2] + +%% [1] Counts down until we shut down in all modes +%% [2] Counts down until we stop publishing in on-confirm mode + +start_link(Type, Name, Config) -> + ok = rabbit_shovel_status:report(Name, Type, starting), + gen_server2:start_link(?MODULE, [Type, Name, Config], []). + +%%--------------------------- +%% Gen Server Implementation +%%--------------------------- + +init([Type, Name, Config0]) -> + Config = case Type of + static -> + Config0; + dynamic -> + ClusterName = rabbit_nodes:cluster_name(), + {ok, Conf} = rabbit_shovel_parameters:parse(Name, + ClusterName, + Config0), + Conf + end, + rabbit_log_shovel:debug("Initialising a Shovel ~s of type '~s'", [human_readable_name(Name), Type]), + gen_server2:cast(self(), init), + {ok, #state{name = Name, type = Type, config = Config}}. + +handle_call(_Msg, _From, State) -> + {noreply, State}. + +handle_cast(init, State = #state{config = Config0}) -> + try rabbit_shovel_behaviour:connect_source(Config0) of + Config -> + rabbit_log_shovel:debug("Shovel ~s connected to source", [human_readable_name(maps:get(name, Config))]), + %% this makes sure that connection pid is updated in case + %% any of the subsequent connection/init steps fail. See + %% rabbitmq/rabbitmq-shovel#54 for context. + gen_server2:cast(self(), connect_dest), + {noreply, State#state{config = Config}} + catch _:_ -> + rabbit_log_shovel:error("Shovel ~s could not connect to source", [human_readable_name(maps:get(name, Config0))]), + {stop, shutdown, State} + end; +handle_cast(connect_dest, State = #state{config = Config0}) -> + try rabbit_shovel_behaviour:connect_dest(Config0) of + Config -> + rabbit_log_shovel:debug("Shovel ~s connected to destination", [human_readable_name(maps:get(name, Config))]), + gen_server2:cast(self(), init_shovel), + {noreply, State#state{config = Config}} + catch _:_ -> + rabbit_log_shovel:error("Shovel ~s could not connect to destination", [human_readable_name(maps:get(name, Config0))]), + {stop, shutdown, State} + end; +handle_cast(init_shovel, State = #state{config = Config}) -> + %% Don't trap exits until we have established connections so that + %% if we try to shut down while waiting for a connection to be + %% established then we don't block + process_flag(trap_exit, true), + Config1 = rabbit_shovel_behaviour:init_dest(Config), + Config2 = rabbit_shovel_behaviour:init_source(Config1), + rabbit_log_shovel:debug("Shovel ~s has finished setting up its topology", [human_readable_name(maps:get(name, Config2))]), + State1 = State#state{config = Config2}, + ok = report_running(State1), + {noreply, State1}. + + +handle_info(Msg, State = #state{config = Config, name = Name}) -> + case rabbit_shovel_behaviour:handle_source(Msg, Config) of + not_handled -> + case rabbit_shovel_behaviour:handle_dest(Msg, Config) of + not_handled -> + rabbit_log_shovel:warning("Shovel ~s could not handle a destination message ~p", [human_readable_name(Name), Msg]), + {noreply, State}; + {stop, {outbound_conn_died, heartbeat_timeout}} -> + rabbit_log_shovel:error("Shovel ~s detected missed heartbeats on destination connection", [human_readable_name(Name)]), + {stop, {shutdown, heartbeat_timeout}, State}; + {stop, {outbound_conn_died, Reason}} -> + rabbit_log_shovel:error("Shovel ~s detected destination connection failure: ~p", [human_readable_name(Name), Reason]), + {stop, Reason, State}; + {stop, Reason} -> + rabbit_log_shovel:debug("Shovel ~s decided to stop due a message from destination: ~p", [human_readable_name(Name), Reason]), + {stop, Reason, State}; + Config1 -> + {noreply, State#state{config = Config1}} + end; + {stop, {inbound_conn_died, heartbeat_timeout}} -> + rabbit_log_shovel:error("Shovel ~s detected missed heartbeats on source connection", [human_readable_name(Name)]), + {stop, {shutdown, heartbeat_timeout}, State}; + {stop, {inbound_conn_died, Reason}} -> + rabbit_log_shovel:error("Shovel ~s detected source connection failure: ~p", [human_readable_name(Name), Reason]), + {stop, Reason, State}; + {stop, Reason} -> + rabbit_log_shovel:error("Shovel ~s decided to stop due a message from source: ~p", [human_readable_name(Name), Reason]), + {stop, Reason, State}; + Config1 -> + {noreply, State#state{config = Config1}} + end. + +terminate({shutdown, autodelete}, State = #state{name = {VHost, Name}, + type = dynamic}) -> + rabbit_log_shovel:info("Shovel '~s' is stopping (it was configured to autodelete and transfer is completed)", + [human_readable_name({VHost, Name})]), + close_connections(State), + %% See rabbit_shovel_dyn_worker_sup_sup:stop_child/1 + put(shovel_worker_autodelete, true), + _ = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ?SHOVEL_USER), + rabbit_shovel_status:remove({VHost, Name}), + ok; +terminate(shutdown, State) -> + close_connections(State), + ok; +terminate(socket_closed_unexpectedly, State) -> + close_connections(State), + ok; +terminate({'EXIT', heartbeat_timeout}, State = #state{name = Name}) -> + rabbit_log_shovel:error("Shovel ~s is stopping because of a heartbeat timeout", [human_readable_name(Name)]), + rabbit_shovel_status:report(State#state.name, State#state.type, + {terminated, "heartbeat timeout"}), + close_connections(State), + ok; +terminate({'EXIT', outbound_conn_died}, State = #state{name = Name}) -> + rabbit_log_shovel:error("Shovel ~s is stopping because destination connection failed", [human_readable_name(Name)]), + rabbit_shovel_status:report(State#state.name, State#state.type, + {terminated, "destination connection failed"}), + close_connections(State), + ok; +terminate({'EXIT', inbound_conn_died}, State = #state{name = Name}) -> + rabbit_log_shovel:error("Shovel ~s is stopping because destination connection failed", [human_readable_name(Name)]), + rabbit_shovel_status:report(State#state.name, State#state.type, + {terminated, "source connection failed"}), + close_connections(State), + ok; +terminate({shutdown, heartbeat_timeout}, State = #state{name = Name}) -> + rabbit_log_shovel:error("Shovel ~s is stopping because of a heartbeat timeout", [human_readable_name(Name)]), + rabbit_shovel_status:report(State#state.name, State#state.type, + {terminated, "heartbeat timeout"}), + close_connections(State), + ok; +terminate({shutdown, restart}, State = #state{name = Name}) -> + rabbit_log_shovel:error("Shovel ~s is stopping to restart", [human_readable_name(Name)]), + rabbit_shovel_status:report(State#state.name, State#state.type, + {terminated, "needed a restart"}), + close_connections(State), + ok; +terminate(Reason, State = #state{name = Name}) -> + rabbit_log_shovel:error("Shovel ~s is stopping, reason: ~p", [human_readable_name(Name), Reason]), + rabbit_shovel_status:report(State#state.name, State#state.type, + {terminated, Reason}), + close_connections(State), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%--------------------------- +%% Helpers +%%--------------------------- + +human_readable_name(Name) -> + case Name of + {VHost, ShovelName} -> rabbit_misc:format("'~s' in virtual host '~s'", [ShovelName, VHost]); + ShovelName -> rabbit_misc:format("'~s'", [ShovelName]) + end. + +report_running(#state{config = Config} = State) -> + InUri = rabbit_shovel_behaviour:source_uri(Config), + OutUri = rabbit_shovel_behaviour:dest_uri(Config), + InProto = rabbit_shovel_behaviour:source_protocol(Config), + OutProto = rabbit_shovel_behaviour:dest_protocol(Config), + InEndpoint = rabbit_shovel_behaviour:source_endpoint(Config), + OutEndpoint = rabbit_shovel_behaviour:dest_endpoint(Config), + rabbit_shovel_status:report(State#state.name, State#state.type, + {running, [{src_uri, rabbit_data_coercion:to_binary(InUri)}, + {src_protocol, rabbit_data_coercion:to_binary(InProto)}, + {dest_protocol, rabbit_data_coercion:to_binary(OutProto)}, + {dest_uri, rabbit_data_coercion:to_binary(OutUri)}] + ++ props_to_binary(InEndpoint) ++ props_to_binary(OutEndpoint) + }). + +props_to_binary(Props) -> + [{K, rabbit_data_coercion:to_binary(V)} || {K, V} <- Props]. + +%% for static shovels, name is an atom from the configuration file +get_connection_name(ShovelName) when is_atom(ShovelName) -> + Prefix = <<"Shovel ">>, + ShovelNameAsBinary = atom_to_binary(ShovelName, utf8), + <<Prefix/binary, ShovelNameAsBinary/binary>>; + +%% for dynamic shovels, name is a tuple with a binary +get_connection_name({_, Name}) when is_binary(Name) -> + Prefix = <<"Shovel ">>, + <<Prefix/binary, Name/binary>>; + +%% fallback +get_connection_name(_) -> + <<"Shovel">>. + +close_connections(#state{config = Conf}) -> + ok = rabbit_shovel_behaviour:close_source(Conf), + ok = rabbit_shovel_behaviour:close_dest(Conf). |