summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl')
-rw-r--r--deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl777
1 files changed, 777 insertions, 0 deletions
diff --git a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl
new file mode 100644
index 0000000000..81125988cc
--- /dev/null
+++ b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl
@@ -0,0 +1,777 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% The Initial Developer of the Original Code is AWeber Communications.
+%% Copyright (c) 2015-2016 AWeber Communications
+%% Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_peer_discovery_consul).
+-behaviour(rabbit_peer_discovery_backend).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl").
+-include("rabbit_peer_discovery_consul.hrl").
+
+-export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0,
+ post_registration/0, lock/1, unlock/1]).
+-export([send_health_check_pass/0]).
+-export([session_ttl_update_callback/1]).
+%% useful for debugging from the REPL with RABBITMQ_ALLOW_INPUT
+-export([service_id/0, service_address/0]).
+%% for tests
+-ifdef(TEST).
+-compile(export_all).
+-endif.
+
+-define(CONFIG_MODULE, rabbit_peer_discovery_config).
+-define(UTIL_MODULE, rabbit_peer_discovery_util).
+
+-define(CONSUL_CHECK_NOTES, "RabbitMQ Consul-based peer discovery plugin TTL check").
+
+%%
+%% API
+%%
+
+init() ->
+ rabbit_log:debug("Peer discovery Consul: initialising..."),
+ ok = application:ensure_started(inets),
+ %% we cannot start this plugin yet since it depends on the rabbit app,
+ %% which is in the process of being started by the time this function is called
+ application:load(rabbitmq_peer_discovery_common),
+ rabbit_peer_discovery_httpc:maybe_configure_proxy(),
+ rabbit_peer_discovery_httpc:maybe_configure_inet6().
+
+-spec list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}.
+
+list_nodes() ->
+ Fun0 = fun() -> {ok, {[], disc}} end,
+ Fun1 = fun() ->
+ rabbit_log:warning("Peer discovery backend is set to ~s "
+ "but final config does not contain rabbit.cluster_formation.peer_discovery_consul. "
+ "Cannot discover any nodes because Consul cluster details are not configured!",
+ [?MODULE]),
+ {ok, {[], disc}}
+ end,
+ Fun2 = fun(Proplist) ->
+ M = maps:from_list(Proplist),
+ case rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M),
+ get_config_key(consul_host, M),
+ get_integer_config_key(consul_port, M),
+ rabbit_peer_discovery_httpc:build_path([v1, health, service, get_config_key(consul_svc, M)]),
+ list_nodes_query_args(),
+ maybe_add_acl([]),
+ []) of
+ {ok, Nodes} ->
+ IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M),
+ Result = extract_nodes(
+ filter_nodes(Nodes, IncludeWithWarnings)),
+ {ok, {Result, disc}};
+ {error, _} = Error ->
+ Error
+ end
+ end,
+ rabbit_peer_discovery_util:maybe_backend_configured(?BACKEND_CONFIG_KEY, Fun0, Fun1, Fun2).
+
+
+-spec supports_registration() -> boolean().
+
+supports_registration() ->
+ true.
+
+
+-spec register() -> ok | {error, Reason :: string()}.
+register() ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ case registration_body() of
+ {ok, Body} ->
+ rabbit_log:debug("Consul registration body: ~s", [Body]),
+ case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
+ get_config_key(consul_host, M),
+ get_integer_config_key(consul_port, M),
+ rabbit_peer_discovery_httpc:build_path([v1, agent, service, register]),
+ [],
+ maybe_add_acl([]),
+ Body) of
+ {ok, _} -> ok;
+ Error -> Error
+ end;
+ Error -> Error
+ end.
+
+
+-spec unregister() -> ok | {error, Reason :: string()}.
+unregister() ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ ID = service_id(),
+ rabbit_log:debug("Unregistering with Consul using service ID '~s'", [ID]),
+ case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
+ get_config_key(consul_host, M),
+ get_integer_config_key(consul_port, M),
+ rabbit_peer_discovery_httpc:build_path([v1, agent, service, deregister, ID]),
+ [],
+ maybe_add_acl([]),
+ []) of
+ {ok, Response} ->
+ rabbit_log:info("Consul's response to the unregistration attempt: ~p", [Response]),
+ ok;
+ Error ->
+ rabbit_log:info("Failed to unregister service with ID '~s` with Consul: ~p",
+ [ID, Error]),
+ Error
+ end.
+
+-spec post_registration() -> ok.
+
+post_registration() ->
+ %% don't wait for one full interval, make
+ %% sure we let Consul know the service is healthy
+ %% right after registration. See rabbitmq/rabbitmq_peer_discovery_consul#8.
+ send_health_check_pass(),
+ ok.
+
+-spec lock(Node :: atom()) -> {ok, Data :: term()} | {error, Reason :: string()}.
+
+lock(Node) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ rabbit_log:debug("Effective Consul peer discovery configuration: ~p", [M]),
+ case create_session(Node, get_config_key(consul_svc_ttl, M)) of
+ {ok, SessionId} ->
+ TRef = start_session_ttl_updater(SessionId),
+ Now = erlang:system_time(seconds),
+ EndTime = Now + get_config_key(lock_wait_time, M),
+ lock(TRef, SessionId, Now, EndTime);
+ {error, Reason} ->
+ {error, lists:flatten(io_lib:format("Error while creating a session, reason: ~s",
+ [Reason]))}
+ end.
+
+-spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok.
+
+unlock({SessionId, TRef}) ->
+ timer:cancel(TRef),
+ rabbit_log:debug("Stopped session renewal"),
+ case release_lock(SessionId) of
+ {ok, true} ->
+ ok;
+ {ok, false} ->
+ {error, lists:flatten(io_lib:format("Error while releasing the lock, session ~s may have been invalidated", [SessionId]))};
+ {error, _} = Err ->
+ Err
+ end.
+
+%%
+%% Implementation
+%%
+
+-spec get_config_key(Key :: atom(), Map :: #{atom() => peer_discovery_config_value()})
+ -> peer_discovery_config_value().
+
+get_config_key(Key, Map) ->
+ ?CONFIG_MODULE:get(Key, ?CONFIG_MAPPING, Map).
+
+-spec get_integer_config_key(Key :: atom(), Map :: #{atom() => peer_discovery_config_value()})
+ -> integer().
+
+get_integer_config_key(Key, Map) ->
+ ?CONFIG_MODULE:get_integer(Key, ?CONFIG_MAPPING, Map).
+
+
+-spec filter_nodes(ConsulResult :: list(), AllowWarning :: atom()) -> list().
+filter_nodes(Nodes, Warn) ->
+ case Warn of
+ true ->
+ lists:filter(fun(Node) ->
+ Checks = maps:get(<<"Checks">>, Node),
+ lists:all(fun(Check) ->
+ lists:member(maps:get(<<"Status">>, Check),
+ [<<"passing">>, <<"warning">>])
+ end,
+ Checks)
+ end,
+ Nodes);
+ false -> Nodes
+ end.
+
+-spec extract_nodes(ConsulResult :: list()) -> list().
+extract_nodes(Data) -> extract_nodes(Data, []).
+
+-spec extract_nodes(ConsulResult :: list(), Nodes :: list())
+ -> list().
+extract_nodes([], Nodes) -> Nodes;
+extract_nodes([H | T], Nodes) ->
+ Service = maps:get(<<"Service">>, H),
+ Value = maps:get(<<"Address">>, Service),
+ NodeName = case ?UTIL_MODULE:as_string(Value) of
+ "" ->
+ NodeData = maps:get(<<"Node">>, H),
+ Node = maps:get(<<"Node">>, NodeData),
+ maybe_add_domain(?UTIL_MODULE:node_name(Node));
+ Address ->
+ ?UTIL_MODULE:node_name(Address)
+ end,
+ extract_nodes(T, lists:merge(Nodes, [NodeName])).
+
+-spec maybe_add_acl(QArgs :: list()) -> list().
+maybe_add_acl(QArgs) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ case get_config_key(consul_acl_token, M) of
+ "undefined" -> QArgs;
+ ACL -> lists:append(QArgs, [{"X-Consul-Token", ACL}])
+ end.
+
+-spec list_nodes_query_args() -> list().
+list_nodes_query_args() ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ list_nodes_query_args(get_config_key(cluster_name, M)).
+
+-spec list_nodes_query_args(ClusterName :: string()) -> list().
+list_nodes_query_args(Cluster) ->
+ ClusterTag = case Cluster of
+ "default" -> [];
+ _ -> [{tag, Cluster}]
+ end,
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ list_nodes_query_args(ClusterTag, get_config_key(consul_include_nodes_with_warnings, M)).
+
+-spec list_nodes_query_args(Args :: list(), AllowWarn :: atom()) -> list().
+list_nodes_query_args(Value, Warn) ->
+ case Warn of
+ true -> Value;
+ false -> [passing | Value]
+ end.
+
+-spec registration_body() -> {ok, Body :: binary()} | {error, atom()}.
+registration_body() ->
+ Payload = build_registration_body(),
+ registration_body(rabbit_json:try_encode(Payload)).
+
+-spec registration_body(Response :: {ok, Body :: string()} |
+ {error, Reason :: atom()})
+ -> {ok, Body :: binary()} | {error, Reason :: atom()}.
+registration_body({ok, Body}) ->
+ {ok, rabbit_data_coercion:to_binary(Body)};
+registration_body({error, Reason}) ->
+ rabbit_log:error("Error serializing the request body: ~p",
+ [Reason]),
+ {error, Reason}.
+
+
+-spec build_registration_body() -> list().
+build_registration_body() ->
+ Payload1 = registration_body_add_id(),
+ Payload2 = registration_body_add_name(Payload1),
+ Payload3 = registration_body_maybe_add_address(Payload2),
+ Payload4 = registration_body_add_port(Payload3),
+ Payload5 = registration_body_maybe_add_check(Payload4),
+ Payload6 = registration_body_maybe_add_tag(Payload5),
+ registration_body_maybe_add_meta(Payload6).
+
+-spec registration_body_add_id() -> list().
+registration_body_add_id() ->
+ [{'ID', rabbit_data_coercion:to_atom(service_id())}].
+
+-spec registration_body_add_name(Payload :: list()) -> list().
+registration_body_add_name(Payload) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ Name = rabbit_data_coercion:to_atom(get_config_key(consul_svc, M)),
+ lists:append(Payload, [{'Name', Name}]).
+
+-spec registration_body_maybe_add_address(Payload :: list())
+ -> list().
+registration_body_maybe_add_address(Payload) ->
+ registration_body_maybe_add_address(Payload, service_address()).
+
+-spec registration_body_maybe_add_address(Payload :: list(), string())
+ -> list().
+registration_body_maybe_add_address(Payload, "undefined") -> Payload;
+registration_body_maybe_add_address(Payload, Address) ->
+ lists:append(Payload, [{'Address', rabbit_data_coercion:to_atom(Address)}]).
+
+registration_body_maybe_add_check(Payload) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ TTL = get_config_key(consul_svc_ttl, M),
+ registration_body_maybe_add_check(Payload, TTL).
+
+-spec registration_body_maybe_add_check(Payload :: list(),
+ TTL :: integer() | undefined)
+ -> list().
+registration_body_maybe_add_check(Payload, undefined) ->
+ case registration_body_maybe_add_deregister([]) of
+ [{'DeregisterCriticalServiceAfter', _}]->
+ rabbit_log:warning("Can't use Consul's service deregistration feature without " ++
+ "using TTL. The parameter will be ignored"),
+ Payload;
+
+ _ -> Payload
+ end;
+registration_body_maybe_add_check(Payload, TTL) ->
+ CheckItems = [{'Notes', rabbit_data_coercion:to_atom(?CONSUL_CHECK_NOTES)},
+ {'TTL', rabbit_data_coercion:to_atom(service_ttl(TTL))},
+ {'Status', 'passing'}],
+ Check = [{'Check', registration_body_maybe_add_deregister(CheckItems)}],
+ lists:append(Payload, Check).
+
+-spec registration_body_add_port(Payload :: list()) -> list().
+registration_body_add_port(Payload) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ lists:append(Payload,
+ [{'Port', get_config_key(consul_svc_port, M)}]).
+
+registration_body_maybe_add_deregister(Payload) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ Deregister = get_config_key(consul_deregister_after, M),
+ registration_body_maybe_add_deregister(Payload, Deregister).
+
+-spec registration_body_maybe_add_deregister(Payload :: list(),
+ TTL :: integer() | undefined)
+ -> list().
+registration_body_maybe_add_deregister(Payload, undefined) -> Payload;
+registration_body_maybe_add_deregister(Payload, Deregister_After) ->
+ Deregister = {'DeregisterCriticalServiceAfter',
+ rabbit_data_coercion:to_atom(service_ttl(Deregister_After))},
+ Payload ++ [Deregister].
+
+-spec registration_body_maybe_add_tag(Payload :: list()) -> list().
+registration_body_maybe_add_tag(Payload) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ Value = get_config_key(cluster_name, M),
+ Tags = ?UTIL_MODULE:as_list(get_config_key(consul_svc_tags, M)),
+ registration_body_maybe_add_tag(Payload, Value, Tags).
+
+-spec registration_body_maybe_add_tag(Payload :: list(),
+ ClusterName :: string(),
+ Tags :: list())
+ -> list().
+registration_body_maybe_add_tag(Payload, "default", []) -> Payload;
+registration_body_maybe_add_tag(Payload, "default", Tags) ->
+ lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(X) || X <- Tags]}]);
+registration_body_maybe_add_tag(Payload, Cluster, []) ->
+ lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(Cluster)]}]);
+registration_body_maybe_add_tag(Payload, Cluster, Tags) ->
+ lists:append(Payload, [{'Tags', [rabbit_data_coercion:to_atom(Cluster)] ++ [rabbit_data_coercion:to_atom(X) || X <- Tags]}]).
+
+
+-spec registration_body_maybe_add_meta(Payload :: list()) -> list().
+registration_body_maybe_add_meta(Payload) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ ClusterName = get_config_key(cluster_name, M),
+ Meta = ?UTIL_MODULE:as_list(get_config_key(consul_svc_meta, M)),
+ registration_body_maybe_add_meta(Payload, ClusterName, Meta).
+
+-spec registration_body_maybe_add_meta(Payload :: list(),
+ ClusterName :: string(),
+ Meta :: list()) -> list().
+registration_body_maybe_add_meta(Payload, "default", []) ->
+ Payload;
+registration_body_maybe_add_meta(Payload, "default", Meta) ->
+ lists:append(Payload, [{<<"meta">>, Meta}]);
+registration_body_maybe_add_meta(Payload, _ClusterName, []) ->
+ Payload;
+registration_body_maybe_add_meta(Payload, ClusterName, Meta) ->
+ Merged = maps:to_list(maps:merge(#{<<"cluster">> => rabbit_data_coercion:to_binary(ClusterName)}, maps:from_list(Meta))),
+ lists:append(Payload, [{<<"meta">>, Merged}]).
+
+
+-spec validate_addr_parameters(false | true, false | true) -> false | true.
+validate_addr_parameters(false, true) ->
+ rabbit_log:warning("The parameter CONSUL_SVC_ADDR_NODENAME" ++
+ " can be used only if CONSUL_SVC_ADDR_AUTO is true." ++
+ " CONSUL_SVC_ADDR_NODENAME value will be ignored."),
+ false;
+validate_addr_parameters(_, _) ->
+ true.
+
+
+-spec service_address() -> string().
+service_address() ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ validate_addr_parameters(get_config_key(consul_svc_addr_auto, M),
+ get_config_key(consul_svc_addr_nodename, M)),
+ service_address(get_config_key(consul_svc_addr, M),
+ get_config_key(consul_svc_addr_auto, M),
+ get_config_key(consul_svc_addr_nic, M),
+ get_config_key(consul_svc_addr_nodename, M)).
+
+
+-spec service_address(Static :: string(),
+ Auto :: boolean(),
+ AutoNIC :: string(),
+ FromNodename :: boolean()) -> string().
+service_address(_, true, "undefined", FromNodename) ->
+ rabbit_peer_discovery_util:node_hostname(FromNodename);
+service_address(Value, false, "undefined", _) ->
+ Value;
+service_address(_, true, NIC, _) ->
+ %% TODO: support IPv6
+ {ok, Addr} = rabbit_peer_discovery_util:nic_ipv4(NIC),
+ Addr;
+%% this combination makes no sense but this is what rabbitmq-autocluster
+%% and this plugin have been allowing for a couple of years, so we keep
+%% this clause around for backwards compatibility.
+%% See rabbitmq/rabbitmq-peer-discovery-consul#12 for details.
+service_address(_, false, NIC, _) ->
+ {ok, Addr} = rabbit_peer_discovery_util:nic_ipv4(NIC),
+ Addr.
+
+
+-spec service_id() -> string().
+service_id() ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ service_id(get_config_key(consul_svc, M),
+ service_address()).
+
+-spec service_id(Name :: string(), Address :: string()) -> string().
+service_id(Service, "undefined") -> Service;
+service_id(Service, Address) ->
+ string:join([Service, Address], ":").
+
+-spec service_ttl(TTL :: integer()) -> string().
+service_ttl(Value) ->
+ rabbit_peer_discovery_util:as_string(Value) ++ "s".
+
+-spec maybe_add_domain(Domain :: atom()) -> atom().
+maybe_add_domain(Value) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ case get_config_key(consul_use_longname, M) of
+ true ->
+ rabbit_data_coercion:to_atom(string:join([atom_to_list(Value),
+ "node",
+ get_config_key(consul_domain, M)],
+ "."));
+ false -> Value
+ end.
+
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Let Consul know that this node is still around
+%% @end
+%%--------------------------------------------------------------------
+
+-spec send_health_check_pass() -> ok.
+
+send_health_check_pass() ->
+ Service = string:join(["service", service_id()], ":"),
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ rabbit_log:debug("Running Consul health check"),
+ case rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
+ get_config_key(consul_host, M),
+ get_integer_config_key(consul_port, M),
+ rabbit_peer_discovery_httpc:build_path([v1, agent, check, pass, Service]),
+ [],
+ maybe_add_acl([]),
+ []) of
+ {ok, []} -> ok;
+ {error, "429"} ->
+ %% Too Many Requests, see https://www.consul.io/docs/agent/checks.html
+ rabbit_log:warning("Consul responded to a health check with 429 Too Many Requests"),
+ ok;
+ {error, "500"} ->
+ rabbit_log:warning("Consul responded to a health check with a 500 status, will wait and try re-registering"),
+ maybe_re_register(wait_for_list_nodes()),
+ ok;
+ {error, Reason} ->
+ rabbit_log:error("Error running Consul health check: ~p",
+ [Reason]),
+ ok
+ end.
+
+maybe_re_register({error, Reason}) ->
+ rabbit_log:error("Internal error in Consul while updating health check. "
+ "Cannot obtain list of nodes registered in Consul either: ~p",
+ [Reason]);
+maybe_re_register({ok, {Members, _NodeType}}) ->
+ maybe_re_register(Members);
+maybe_re_register({ok, Members}) ->
+ maybe_re_register(Members);
+maybe_re_register(Members) ->
+ case lists:member(node(), Members) of
+ true ->
+ rabbit_log:error("Internal error in Consul while updating health check",
+ []);
+ false ->
+ rabbit_log:error("Internal error in Consul while updating health check, "
+ "node is not registered. Re-registering", []),
+ register()
+ end.
+
+wait_for_list_nodes() ->
+ wait_for_list_nodes(60).
+
+wait_for_list_nodes(N) ->
+ case {list_nodes(), N} of
+ {Reply, 0} ->
+ Reply;
+ {{ok, _} = Reply, _} ->
+ Reply;
+ {{error, _}, _} ->
+ timer:sleep(1000),
+ wait_for_list_nodes(N - 1)
+ end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Create a session to be acquired for a common key
+%% @end
+%%--------------------------------------------------------------------
+-spec create_session(string(), pos_integer()) -> {ok, string()} | {error, Reason::string()}.
+create_session(Name, TTL) ->
+ case consul_session_create([], maybe_add_acl([]),
+ [{'Name', Name},
+ {'TTL', rabbit_data_coercion:to_atom(service_ttl(TTL))}]) of
+ {ok, Response} ->
+ {ok, get_session_id(Response)};
+ {error, _} = Err ->
+ Err
+ end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Create session
+%% @end
+%%--------------------------------------------------------------------
+-spec consul_session_create(Query, Headers, Body) -> {ok, string()} | {error, any()} when
+ Query :: list(),
+ Headers :: [{string(), string()}],
+ Body :: term().
+consul_session_create(Query, Headers, Body) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ case serialize_json_body(Body) of
+ {ok, Serialized} ->
+ rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
+ get_config_key(consul_host, M),
+ get_integer_config_key(consul_port, M),
+ "v1/session/create",
+ Query,
+ Headers,
+ Serialized);
+ {error, _} = Err ->
+ Err
+ end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Process the result of JSON encoding the request body payload,
+%% returning the body as a binary() value or the error returned by
+%% the JSON serialization library.
+%% @end
+%%--------------------------------------------------------------------
+-spec serialize_json_body(term()) -> {ok, Payload :: binary()} | {error, atom()}.
+serialize_json_body([]) -> {ok, []};
+serialize_json_body(Payload) ->
+ case rabbit_json:try_encode(Payload) of
+ {ok, Body} -> {ok, Body};
+ {error, Reason} -> {error, Reason}
+ end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Extract session ID from Consul response
+%% @end
+%%--------------------------------------------------------------------
+-spec get_session_id(term()) -> string().
+get_session_id(#{<<"ID">> := ID}) -> binary:bin_to_list(ID).
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Start periodically renewing an existing session ttl
+%% @end
+%%--------------------------------------------------------------------
+-spec start_session_ttl_updater(string()) -> ok.
+start_session_ttl_updater(SessionId) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ Interval = get_config_key(consul_svc_ttl, M),
+ rabbit_log:debug("Starting session renewal"),
+ {ok, TRef} = timer:apply_interval(Interval * 500, ?MODULE,
+ session_ttl_update_callback, [SessionId]),
+ TRef.
+
+%%
+%% @doc
+%% Tries to acquire lock. If the lock is held by someone else, waits until it
+%% is released, or too much time has passed
+%% @end
+-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, string()} | {error, string()}.
+lock(TRef, _, Now, EndTime) when EndTime < Now ->
+ timer:cancel(TRef),
+ {error, "Acquiring lock taking too long, bailing out"};
+lock(TRef, SessionId, _, EndTime) ->
+ case acquire_lock(SessionId) of
+ {ok, true} ->
+ {ok, {SessionId, TRef}};
+ {ok, false} ->
+ case get_lock_status() of
+ {ok, {SessionHeld, ModifyIndex}} ->
+ Wait = max(EndTime - erlang:system_time(seconds), 0),
+ case wait_for_lock_release(SessionHeld, ModifyIndex, Wait) of
+ ok ->
+ lock(TRef, SessionId, erlang:system_time(seconds), EndTime);
+ {error, Reason} ->
+ timer:cancel(TRef),
+ {error, lists:flatten(io_lib:format("Error waiting for lock release, reason: ~s",[Reason]))}
+ end;
+ {error, Reason} ->
+ timer:cancel(TRef),
+ {error, lists:flatten(io_lib:format("Error obtaining lock status, reason: ~s", [Reason]))}
+ end;
+ {error, Reason} ->
+ timer:cancel(TRef),
+ {error, lists:flatten(io_lib:format("Error while acquiring lock, reason: ~s", [Reason]))}
+ end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Acquire session for a key
+%% @end
+%%--------------------------------------------------------------------
+-spec acquire_lock(string()) -> {ok, any()} | {error, string()}.
+acquire_lock(SessionId) ->
+ consul_kv_write(startup_lock_path(), [{acquire, SessionId}], maybe_add_acl([]), []).
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Release a previously acquired lock held by a given session
+%% @end
+%%--------------------------------------------------------------------
+-spec release_lock(string()) -> {ok, any()} | {error, string()}.
+release_lock(SessionId) ->
+ consul_kv_write(startup_lock_path(), [{release, SessionId}], maybe_add_acl([]), []).
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Write KV store key value
+%% @end
+%%--------------------------------------------------------------------
+-spec consul_kv_write(Path, Query, Headers, Body) -> {ok, any()} | {error, string()} when
+ Path :: string(),
+ Query :: [{string(), string()}],
+ Headers :: [{string(), string()}],
+ Body :: term().
+consul_kv_write(Path, Query, Headers, Body) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ case serialize_json_body(Body) of
+ {ok, Serialized} ->
+ rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
+ get_config_key(consul_host, M),
+ get_integer_config_key(consul_port, M),
+ "v1/kv/" ++ Path,
+ Query,
+ Headers,
+ Serialized);
+ {error, _} = Err ->
+ Err
+ end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Read KV store key value
+%% @end
+%%--------------------------------------------------------------------
+-spec consul_kv_read(Path, Query, Headers) -> {ok, term()} | {error, string()} when
+ Path :: string(),
+ Query :: [{string(), string()}],
+ Headers :: [{string(), string()}].
+consul_kv_read(Path, Query, Headers) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M),
+ get_config_key(consul_host, M),
+ get_integer_config_key(consul_port, M),
+ "v1/kv/" ++ Path,
+ Query,
+ Headers,
+ []).
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Get lock status
+%% XXX: probably makes sense to wrap output in a record to be
+%% more future-proof
+%% @end
+%%--------------------------------------------------------------------
+-spec get_lock_status() -> {ok, term()} | {error, string()}.
+get_lock_status() ->
+ case consul_kv_read(startup_lock_path(), [], maybe_add_acl([])) of
+ {ok, [KeyData | _]} ->
+ SessionHeld = maps:get(<<"Session">>, KeyData, undefined) =/= undefined,
+ ModifyIndex = maps:get(<<"ModifyIndex">>, KeyData),
+ {ok, {SessionHeld, ModifyIndex}};
+ {error, _} = Err ->
+ Err
+ end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Returns consul path for startup lock
+%% @end
+%%--------------------------------------------------------------------
+-spec startup_lock_path() -> string().
+startup_lock_path() ->
+ base_path() ++ "/" ++ "startup_lock".
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc Return a list of path segments that are the base path for all
+%% consul kv keys related to current cluster.
+%% @end
+%%--------------------------------------------------------------------
+-spec base_path() -> string().
+base_path() ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ Segments = [get_config_key(consul_lock_prefix, M), get_config_key(cluster_name, M)],
+ rabbit_peer_discovery_httpc:build_path(Segments).
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Wait for lock to be released if it has been acquired by another node
+%% @end
+%%--------------------------------------------------------------------
+-spec wait_for_lock_release(atom(), pos_integer(), pos_integer()) -> ok | {error, string()}.
+wait_for_lock_release(false, _, _) -> ok;
+wait_for_lock_release(_, Index, Wait) ->
+ case consul_kv_read(startup_lock_path(),
+ [{index, Index}, {wait, service_ttl(Wait)}],
+ maybe_add_acl([])) of
+ {ok, _} -> ok;
+ {error, _} = Err -> Err
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Renew an existing session
+%% @end
+%%--------------------------------------------------------------------
+-spec session_ttl_update_callback(string()) -> string().
+session_ttl_update_callback(SessionId) ->
+ _ = consul_session_renew(SessionId, [], maybe_add_acl([])),
+ SessionId.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Renew session TTL
+%% @end
+%%--------------------------------------------------------------------
+-spec consul_session_renew(string(), [{string(), string()}], [{string(), string()}]) -> {ok, term()} | {error, string()}.
+consul_session_renew(SessionId, Query, Headers) ->
+ M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
+ rabbit_peer_discovery_httpc:put(get_config_key(consul_scheme, M),
+ get_config_key(consul_host, M),
+ get_integer_config_key(consul_port, M),
+ rabbit_peer_discovery_httpc:build_path([v1, session, renew, rabbit_data_coercion:to_atom(SessionId)]),
+ Query,
+ Headers,
+ []).