summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_clustering.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_clustering.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_clustering.erl269
1 files changed, 0 insertions, 269 deletions
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
deleted file mode 100644
index 8db320433..000000000
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ /dev/null
@@ -1,269 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Maintain cluster membership and stability notifications for replications.
-% On changes to cluster membership, broadcast events to `replication` gen_event.
-% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
-%
-% Cluster stability is defined as "there have been no nodes added or removed in
-% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
-% speedier startup, during initialization there is a shorter StartupPeriod
-% in effect (also configurable).
-%
-% This module is also in charge of calculating ownership of replications based
-% on where their _replicator db documents shards live.
-
--module(couch_replicator_clustering).
-
--behaviour(gen_server).
--behaviour(config_listener).
--behaviour(mem3_cluster).
-
--export([
- start_link/0
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
-]).
-
--export([
- owner/2,
- is_stable/0,
- link_cluster_event_listener/3
-]).
-
-% config_listener callbacks
--export([
- handle_config_change/5,
- handle_config_terminate/3
-]).
-
-% mem3_cluster callbacks
--export([
- cluster_stable/1,
- cluster_unstable/1
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-% seconds
--define(DEFAULT_QUIET_PERIOD, 60).
-% seconds
--define(DEFAULT_START_PERIOD, 5).
--define(RELISTEN_DELAY, 5000).
-
--record(state, {
- mem3_cluster_pid :: pid(),
- cluster_stable :: boolean()
-}).
-
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-% owner/2 function computes ownership for a {DbName, DocId} tuple
-% `unstable` if cluster is considered to be unstable i.e. it has changed
-% recently, or returns node() which of the owner.
-%
--spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
-owner(<<"shards/", _/binary>> = DbName, DocId) ->
- case is_stable() of
- false ->
- unstable;
- true ->
- owner_int(DbName, DocId)
- end;
-owner(_DbName, _DocId) ->
- node().
-
--spec is_stable() -> true | false.
-is_stable() ->
- gen_server:call(?MODULE, is_stable).
-
--spec link_cluster_event_listener(atom(), atom(), list()) -> pid().
-link_cluster_event_listener(Mod, Fun, Args) when
- is_atom(Mod), is_atom(Fun), is_list(Args)
-->
- CallbackFun =
- fun
- (Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]);
- (_) -> ok
- end,
- {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
- Pid.
-
-% Mem3 cluster callbacks
-
-cluster_unstable(Server) ->
- ok = gen_server:call(Server, set_unstable),
- couch_replicator_notifier:notify({cluster, unstable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- couch_log:notice("~s : cluster unstable", [?MODULE]),
- Server.
-
-cluster_stable(Server) ->
- ok = gen_server:call(Server, set_stable),
- couch_replicator_notifier:notify({cluster, stable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
- couch_log:notice("~s : cluster stable", [?MODULE]),
- Server.
-
-% gen_server callbacks
-
-init([]) ->
- ok = config:listen_for_changes(?MODULE, nil),
- Period = abs(
- config:get_integer(
- "replicator",
- "cluster_quiet_period",
- ?DEFAULT_QUIET_PERIOD
- )
- ),
- StartPeriod = abs(
- config:get_integer(
- "replicator",
- "cluster_start_period",
- ?DEFAULT_START_PERIOD
- )
- ),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- {ok, Mem3Cluster} = mem3_cluster:start_link(
- ?MODULE,
- self(),
- StartPeriod,
- Period
- ),
- {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
-
-terminate(_Reason, _State) ->
- ok.
-
-handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
- {reply, IsStable, State};
-handle_call(set_stable, _From, State) ->
- {reply, ok, State#state{cluster_stable = true}};
-handle_call(set_unstable, _From, State) ->
- {reply, ok, State#state{cluster_stable = false}}.
-
-handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
- ok = mem3_cluster:set_period(Pid, Period),
- {noreply, State}.
-
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
- {noreply, State}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% Internal functions
-
-handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
- {ok, S};
-handle_config_change(_, _, _, _, S) ->
- {ok, S}.
-
-handle_config_terminate(_, stop, _) ->
- ok;
-handle_config_terminate(_S, _R, _St) ->
- Pid = whereis(?MODULE),
- erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
--spec owner_int(binary(), binary()) -> node().
-owner_int(ShardName, DocId) ->
- DbName = mem3:dbname(ShardName),
- Live = [node() | nodes()],
- Shards = mem3:shards(DbName, DocId),
- Nodes = [N || #shard{node = N} <- Shards, lists:member(N, Live)],
- mem3:owner(DbName, DocId, Nodes).
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-replicator_clustering_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_stable_callback(),
- t_unstable_callback()
- ]
- }
- }.
-
-t_stable_callback() ->
- ?_test(begin
- ?assertEqual(false, is_stable()),
- cluster_stable(whereis(?MODULE)),
- ?assertEqual(true, is_stable())
- end).
-
-t_unstable_callback() ->
- ?_test(begin
- cluster_stable(whereis(?MODULE)),
- ?assertEqual(true, is_stable()),
- cluster_unstable(whereis(?MODULE)),
- ?assertEqual(false, is_stable())
- end).
-
-setup_all() ->
- meck:expect(couch_log, notice, 2, ok),
- meck:expect(config, get, fun(_, _, Default) -> Default end),
- meck:expect(config, listen_for_changes, 2, ok),
- meck:expect(couch_stats, update_gauge, 2, ok),
- meck:expect(couch_replicator_notifier, notify, 1, ok).
-
-teardown_all(_) ->
- meck:unload().
-
-setup() ->
- meck:reset([
- config,
- couch_log,
- couch_stats,
- couch_replicator_notifier
- ]),
- stop_clustering_process(),
- {ok, Pid} = start_link(),
- Pid.
-
-teardown(Pid) ->
- stop_clustering_process(Pid).
-
-stop_clustering_process() ->
- stop_clustering_process(whereis(?MODULE)).
-
-stop_clustering_process(undefined) ->
- ok;
-stop_clustering_process(Pid) when is_pid(Pid) ->
- Ref = erlang:monitor(process, Pid),
- unlink(Pid),
- exit(Pid, kill),
- receive
- {'DOWN', Ref, _, _, _} -> ok
- end.
-
--endif.