diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2017-09-25 13:00:13 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2017-09-25 15:34:16 -0400 |
commit | 1eaf17890ab8d6bf7504355e4e32aaf2357b1398 (patch) | |
tree | 5aa3a0bc0ffa9b01027922331bf43ddb4468c584 /src/rexi | |
parent | 8d1c7043731fbaa5f4f93243df5144416c946604 (diff) | |
download | couchdb-1eaf17890ab8d6bf7504355e4e32aaf2357b1398.tar.gz |
Do not buffer rexi messages to disconnected nodes
Instead wait 15 seconds after last cluster configuration change, if there
were no more changes to the cluster, stop rexi buffers and servers for nodes
which are no longer connected.
Extract and reuse cluster stability check from `couch_replicator_clustering`
and move it to `mem3_cluster` module, so both replicator and rexi can use it.
Users of `mem3_cluster` would implement a behavior callback API then spawn_link
the cluster monitor with their specific period values.
This also simplifies the logic in rexi_server_mon as it no longer needs to
handle `{nodeup, _}` and `{nodedown, _}` messages. On any cluster membership
change it will get a `cluster_unstable` message. It then immediately spawns new
servers and buffers if needed. Only when cluster has stabilized it will stop
servers and buffers for disconnected nodes. The idea is to allow for short
periods of disconnects between nodes before throwing away all the buffered
messages.
Diffstat (limited to 'src/rexi')
-rw-r--r-- | src/rexi/src/rexi_server_mon.erl | 84 |
1 files changed, 64 insertions, 20 deletions
diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl index e6b5eb98e..86fecaff6 100644 --- a/src/rexi/src/rexi_server_mon.erl +++ b/src/rexi/src/rexi_server_mon.erl @@ -14,6 +14,7 @@ -module(rexi_server_mon). -behaviour(gen_server). +-behaviour(mem3_cluster). -vsn(1). @@ -32,8 +33,13 @@ code_change/3 ]). +-export([ + cluster_stable/1, + cluster_unstable/1 +]). --define(INTERVAL, 60000). + +-define(CLUSTER_STABILITY_PERIOD_SEC, 15). start_link(ChildMod) -> @@ -45,9 +51,23 @@ status() -> gen_server:call(?MODULE, status). +% Mem3 cluster callbacks + +cluster_unstable(Server) -> + couch_log:notice("~s : cluster unstable", [?MODULE]), + gen_server:cast(Server, cluster_unstable), + Server. + +cluster_stable(Server) -> + gen_server:cast(Server, cluster_stable), + Server. + + +% gen_server callbacks + init(ChildMod) -> - net_kernel:monitor_nodes(true), - erlang:send(self(), check_nodes), + {ok, _Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), + ?CLUSTER_STABILITY_PERIOD_SEC, ?CLUSTER_STABILITY_PERIOD_SEC), {ok, ChildMod}. @@ -67,24 +87,27 @@ handle_call(Msg, _From, St) -> couch_log:notice("~s ignored_call ~w", [?MODULE, Msg]), {reply, ignored, St}. - -handle_cast(Msg, St) -> - couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]), - {noreply, St}. - - -handle_info({nodeup, _}, ChildMod) -> +% If cluster is unstable a node was added or just removed. Check if any nodes +% can be started, but do not immediately stop nodes, defer that till cluster +% stabilized. +handle_cast(cluster_unstable, ChildMod) -> + couch_log:notice("~s : cluster unstable", [ChildMod]), start_servers(ChildMod), {noreply, ChildMod}; -handle_info({nodedown, _}, St) -> - {noreply, St}; - -handle_info(check_nodes, ChildMod) -> +% When cluster is stable, start any servers for new nodes and stop servers for +% the ones that disconnected. +handle_cast(cluster_stable, ChildMod) -> + couch_log:notice("~s : cluster stable", [ChildMod]), start_servers(ChildMod), - erlang:send_after(?INTERVAL, self(), check_nodes), + stop_servers(ChildMod), {noreply, ChildMod}; +handle_cast(Msg, St) -> + couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]), + {noreply, St}. + + handle_info(Msg, St) -> couch_log:notice("~s ignored_info ~w", [?MODULE, Msg]), {noreply, St}. @@ -101,13 +124,27 @@ start_servers(ChildMod) -> {ok, _} = start_server(ChildMod, Id) end, missing_servers(ChildMod)). +stop_servers(ChildMod) -> + lists:foreach(fun(Id) -> + ok = stop_server(ChildMod, Id) + end, extra_servers(ChildMod)). + + +server_ids(ChildMod) -> + Nodes = [node() | nodes()], + [list_to_atom(lists:concat([ChildMod, "_", Node])) || Node <- Nodes]. + + +running_servers(ChildMod) -> + [Id || {Id, _, _, _} <- supervisor:which_children(sup_module(ChildMod))]. + missing_servers(ChildMod) -> - ServerIds = [list_to_atom(lists:concat([ChildMod, "_", Node])) - || Node <- [node() | nodes()]], - SupModule = sup_module(ChildMod), - ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(SupModule)], - ServerIds -- ChildIds. + server_ids(ChildMod) -- running_servers(ChildMod). + + +extra_servers(ChildMod) -> + running_servers(ChildMod) -- server_ids(ChildMod). start_server(ChildMod, ChildId) -> @@ -126,5 +163,12 @@ start_server(ChildMod, ChildId) -> erlang:error(Else) end. + +stop_server(ChildMod, ChildId) -> + SupMod = sup_module(ChildMod), + ok = supervisor:terminate_child(SupMod, ChildId), + ok = supervisor:delete_child(SupMod, ChildId). + + sup_module(ChildMod) -> list_to_atom(lists:concat([ChildMod, "_sup"])). |