summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-12-01 15:02:24 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-12-02 11:53:21 -0500
commit4530f8e1f0613bcb2118aef157f0151507866ddd (patch)
tree5c19498f07cfcd6a0bd7db917e465a1f9c012c61 /src
parentf13ceb46ce3c120e1960fa47bfda0a606601900e (diff)
downloadcouchdb-4530f8e1f0613bcb2118aef157f0151507866ddd.tar.gz
Remove all usage of global
Global has a tendency to create deadlocks [1], and when that happens replication jobs can't start on any of the nodes in the cluster. We don't really need strict consistent locking for replication jobs. It's mostly to avoid replication jobs thrashing the same checkpoint docs back and forth between different session IDs. So, remove global to avoid any issues around it, and replace it with `pg` -- the new (Erlang 23+) process group implementation. (Technically `global` is still running in the runtime system as it's started by the `kernel` app. We just avoid interacting with it and registering any names to avoid deadlocks). In `pg` we start a replication `scope`, and then in that scope make every RepId a process `group`. When replication processes spawn, their Pids becomes `members` of that group: ``` couch_replicator_pg (scope): "a12c+create_target" (RepId group): [Pid1, Pid2] (members) ... ``` As per `pg` implementation, groups are created and remove automatically as members are added/removed from it, so we don't have to do anything there. If there are already any running Pids in the same group, we avoid starting the jobs, and fail like we did before when we used global. In the even more rare case of a race condition, when 2+ jobs do manage to start, we do a membership check before each checkpoint. One of the jobs then stops to yield to another. For simplicity pick the one running on the lowest lexicographically sorted node name to survive. [1] https://github.com/erlang/otp/issues/6524 [2] https://www.erlang.org/doc/man/pg.html
Diffstat (limited to 'src')
-rw-r--r--src/couch_event/src/couch_event_listener.erl6
-rw-r--r--src/couch_replicator/src/couch_replicator_pg.erl153
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl66
-rw-r--r--src/couch_replicator/src/couch_replicator_sup.erl1
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl42
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_test_helper.erl2
6 files changed, 238 insertions, 32 deletions
diff --git a/src/couch_event/src/couch_event_listener.erl b/src/couch_event/src/couch_event_listener.erl
index 40f1a5c65..22bf23327 100644
--- a/src/couch_event/src/couch_event_listener.erl
+++ b/src/couch_event/src/couch_event_listener.erl
@@ -175,14 +175,8 @@ do_terminate(Reason, #st{module = Module, state = State}) ->
end,
erlang:exit(Status).
-where({global, Name}) -> global:whereis_name(Name);
where({local, Name}) -> whereis(Name).
-name_register({global, Name} = GN) ->
- case global:register_name(Name, self()) of
- yes -> true;
- no -> {false, where(GN)}
- end;
name_register({local, Name} = LN) ->
try register(Name, self()) of
true -> true
diff --git a/src/couch_replicator/src/couch_replicator_pg.erl b/src/couch_replicator/src/couch_replicator_pg.erl
new file mode 100644
index 000000000..25937ec15
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_pg.erl
@@ -0,0 +1,153 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Use pg process groups to reduce the chance of duplicate replication jobs
+% running on the same cluster.
+%
+% A custom replicator pg group is started via start_link/0. Then, replication
+% jobs check if they would be leaders before starting. If, by chance, two jobs
+% with the same RepId start anyway, then replication jobs would do an extra
+% check before each checkpoint. If the they are not leaders any longer, they
+% should stop running. The "leader" is just the first sorted element in the
+% [node(Pid), ...] list.
+
+-module(couch_replicator_pg).
+
+-export([
+ start_link/0,
+ join/2,
+ leave/2,
+ pids/1,
+ should_start/2,
+ should_run/2
+]).
+
+% Start a custom pg group. Should be called from the replication supervisor.
+%
+start_link() ->
+ pg:start_link(?MODULE).
+
+% Join a replication job pid to a RepId group
+%
+join({_, _} = RepId, Pid) when is_pid(Pid) ->
+ pg:join(?MODULE, id(RepId), Pid).
+
+% Leave a replication RepId group. This doesn't have to be called explicitly as
+% the processes are monitored and automatically removed by pg. It may be nice,
+% to call it from terminate/2 to speed things along a bit and clear the group
+% quicker.
+%
+leave({_, _} = RepId, Pid) when is_pid(Pid) ->
+ pg:leave(?MODULE, id(RepId), Pid).
+
+% Determine if a replication job should start on a particular node. If it
+% should, return `yes`, otherwise return `{no, OtherPid}`. `OtherPid` is
+% the pid of the replication job that is already running.
+%
+should_start({_, _} = RepId, Node) when is_atom(Node) ->
+ no_other_nodes(Node, pids(RepId)).
+
+% Determine if the replication job should keep running as the main job for that
+% RepId. If it is, return yes, otherwise return `{no, OtherPid}`. `OtherPid` is
+% the pid of the replication job that should stay running instead of this one.
+%
+should_run({_, _} = RepId, Pid) when is_pid(Pid) ->
+ case pids(RepId) of
+ [OtherPid | _] when OtherPid =/= Pid -> {no, OtherPid};
+ _ -> yes
+ end.
+
+% Sort all the pids by node first to get some deterministic order. For all we
+% know, pids may already sort that way, but we're just making it explicit here
+% in case it somehow changes in the future.
+%
+pids({_, _} = RepId) ->
+ NodePids = [{node(P), P} || P <- pg:get_members(?MODULE, id(RepId))],
+ {_, Pids} = lists:unzip(lists:usort(NodePids)),
+ Pids.
+
+id({Base, Ext}) ->
+ iolist_to_binary([Base, Ext]).
+
+no_other_nodes(_, []) ->
+ yes;
+no_other_nodes(Node, [Pid | _]) when Node =/= node(Pid) ->
+ {no, Pid};
+no_other_nodes(Node, [Pid | Pids]) when Node =:= node(Pid) ->
+ no_other_nodes(Node, Pids).
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+couch_replicator_pg_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_start_stop),
+ ?TDEF_FE(t_join_leave),
+ ?TDEF_FE(t_should_start),
+ ?TDEF_FE(t_should_run)
+ ]
+ }.
+
+setup() ->
+ {ok, PGPid} = start_link(),
+ PGPid.
+
+teardown(PGPid) when is_pid(PGPid) ->
+ ?assertEqual(ok, gen_server:stop(PGPid)).
+
+t_start_stop(PGPid) ->
+ ?assert(is_process_alive(PGPid)),
+ ?assertEqual([], pg:which_groups(?MODULE)).
+
+t_join_leave(_) ->
+ RepId = {"a", "+b"},
+ ?assertEqual([], pids(RepId)),
+ Pid = self(),
+ ?assertEqual(ok, join(RepId, Pid)),
+ ?assertEqual([id(RepId)], pg:which_groups(?MODULE)),
+ ?assertEqual([Pid], pids(RepId)),
+ ?assertEqual(ok, leave(RepId, Pid)),
+ ?assertEqual(not_joined, leave(RepId, Pid)),
+ ?assertEqual([], pids(RepId)),
+ ?assertEqual([], pg:which_groups(?MODULE)).
+
+t_should_start(_) ->
+ RepId = {"a", "+b"},
+ ?assertEqual(yes, should_start(RepId, node())),
+ ?assertEqual(yes, should_start(RepId, 'foo@bar.bogus.net')),
+ Pid = self(),
+ ok = join(RepId, Pid),
+ % On the same node we let it start, it will blow up anyway in the
+ % supervisor.
+ ?assertEqual(yes, should_start(RepId, node())),
+ ?assertEqual({no, Pid}, should_start(RepId, 'foo@bar.bogus42.net')).
+
+t_should_run(_) ->
+ RepId = {"a", "+b"},
+ Pid = self(),
+ % This is odd case, somehow a job asks if it should run but it hasn't
+ % registered. We just choose to let it run.
+ ?assertEqual(yes, should_run(RepId, Pid)),
+ ok = join(RepId, Pid),
+ % The only job registered is itself
+ ?assertEqual(yes, should_run(RepId, Pid)),
+ % Let's add <0.0.0> init so it can sort lower
+ InitPid = whereis(init),
+ ok = join(RepId, InitPid),
+ ?assertEqual({no, InitPid}, should_run(RepId, Pid)).
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 1d3e70c5a..416220efd 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -81,21 +81,25 @@
view = nil
}).
-start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
+start_link(#rep{id = Id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
RepChildId = BaseId ++ Ext,
Source = couch_replicator_api_wrap:db_uri(Src),
Target = couch_replicator_api_wrap:db_uri(Tgt),
- ServerName = {global, {?MODULE, Rep#rep.id}},
-
- case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
- {ok, Pid} ->
- {ok, Pid};
- {error, Reason} ->
- couch_log:warning(
- "failed to start replication `~s` (`~s` -> `~s`)",
- [RepChildId, Source, Target]
- ),
- {error, Reason}
+ case couch_replicator_pg:should_start(Id, node()) of
+ yes ->
+ case gen_server:start_link({local, ?MODULE}, ?MODULE, Rep, []) of
+ {ok, Pid} ->
+ couch_replicator_pg:join(Id, Pid),
+ {ok, Pid};
+ {error, Reason} ->
+ couch_log:warning(
+ "failed to start replication `~s` (`~s` -> `~s`)",
+ [RepChildId, Source, Target]
+ ),
+ {error, Reason}
+ end;
+ {no, OtherPid} ->
+ {error, {already_started, OtherPid}}
end.
init(InitArgs) ->
@@ -234,14 +238,19 @@ handle_call({report_seq_done, Seq, StatsInc}, From, State) ->
handle_cast({sum_stats, Stats}, State) ->
NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
{noreply, State#rep_state{stats = NewStats}};
-handle_cast(checkpoint, State) ->
- case do_checkpoint(State) of
- {ok, NewState} ->
- couch_stats:increment_counter([couch_replicator, checkpoints, success]),
- {noreply, NewState#rep_state{timer = start_timer(State)}};
- Error ->
- couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
- {stop, Error, State}
+handle_cast(checkpoint, #rep_state{rep_details = Rep} = State) ->
+ case couch_replicator_pg:should_run(Rep#rep.id, self()) of
+ yes ->
+ case do_checkpoint(State) of
+ {ok, NewState} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+ {noreply, NewState#rep_state{timer = start_timer(State)}};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+ {stop, Error, State}
+ end;
+ {no, OtherPid} ->
+ {stop, {shutdown, {duplicate_job, OtherPid}}, State}
end;
handle_cast(
{report_seq, Seq},
@@ -367,6 +376,7 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
terminate_cleanup(State1);
terminate({shutdown, max_backoff}, {error, InitArgs}) ->
#rep{id = {BaseId, Ext} = RepId} = InitArgs,
+ couch_replicator_pg:leave(RepId, self()),
couch_stats:increment_counter([couch_replicator, failed_starts]),
couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
couch_replicator_notifier:notify({error, RepId, max_backoff});
@@ -378,6 +388,7 @@ terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
doc_id = DocId,
db_name = DbName
} = InitArgs,
+ couch_replicator_pg:leave(RepId, self()),
Source = couch_replicator_api_wrap:db_uri(Source0),
Target = couch_replicator_api_wrap:db_uri(Target0),
RepIdStr = BaseId ++ Ext,
@@ -406,6 +417,18 @@ terminate({shutdown, max_backoff}, State) ->
),
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, max_backoff});
+terminate({shutdown, {duplicate_job, OtherPid}}, State) ->
+ #rep_state{
+ source_name = Source,
+ target_name = Target,
+ rep_details = #rep{id = {BaseId, Ext} = RepId}
+ } = State,
+ couch_log:error(
+ "Replication `~s` (`~s` -> `~s`) with pid ~p was usurped by ~p on node ~p",
+ [BaseId ++ Ext, Source, Target, self(), OtherPid, node(OtherPid)]
+ ),
+ terminate_cleanup(State),
+ couch_replicator_notifier:notify({error, RepId, duplicate_job});
terminate({shutdown, Reason}, State) ->
% Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
% wrapped around the message
@@ -423,7 +446,8 @@ terminate(Reason, State) ->
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, Reason}).
-terminate_cleanup(State) ->
+terminate_cleanup(#rep_state{rep_details = #rep{id = RepId}} = State) ->
+ couch_replicator_pg:leave(RepId, self()),
update_task(State),
couch_replicator_api_wrap:db_close(State#rep_state.source),
couch_replicator_api_wrap:db_close(State#rep_state.target).
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index 33eee8659..1cf973f71 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -22,6 +22,7 @@ init(_Args) ->
Children = [
{couch_replication_event, {gen_event, start_link, [{local, couch_replication}]}, permanent,
brutal_kill, worker, dynamic},
+ {couch_replicator_pg, {couch_replicator_pg, start_link, []}, permanent, 1000, worker, [pg]},
{couch_replicator_clustering, {couch_replicator_clustering, start_link, []}, permanent,
brutal_kill, worker, [couch_replicator_clustering]},
{couch_replicator_connection, {couch_replicator_connection, start_link, []}, permanent,
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 7ba6bc69d..dd6609941 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -28,7 +28,9 @@ error_reporting_test_() ->
?TDEF_FE(t_fail_bulk_get, 15),
?TDEF_FE(t_fail_changes_queue),
?TDEF_FE(t_fail_changes_manager),
- ?TDEF_FE(t_fail_changes_reader_proc)
+ ?TDEF_FE(t_fail_changes_reader_proc),
+ ?TDEF_FE(t_dont_start_duplicate_job),
+ ?TDEF_FE(t_stop_duplicate_job)
]
}.
@@ -150,6 +152,34 @@ t_fail_changes_reader_proc({_Ctx, {Source, Target}}) ->
?assertEqual({changes_reader_died, kapow}, Result),
couch_replicator_notifier:stop(Listener).
+t_dont_start_duplicate_job({_Ctx, {Source, Target}}) ->
+ meck:new(couch_replicator_pg, [passthrough]),
+ Pid = pid_from_another_node(),
+ meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end),
+ Rep = make_rep(Source, Target),
+ ExpectErr = {error, {already_started, Pid}},
+ ?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)).
+
+t_stop_duplicate_job({_Ctx, {Source, Target}}) ->
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ {ok, Listener} = rep_result_listener(RepId),
+ Pid = pid_from_another_node(),
+ meck:expect(couch_replicator_pg, should_run, fun(_, _) -> {no, Pid} end),
+ RepPid ! {'$gen_cast', checkpoint},
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual(duplicate_job, Result),
+ couch_replicator_notifier:stop(Listener).
+
+pid_from_another_node() ->
+ % Use a Pid serialized from a node named A@1
+ % (A@1)1> term_to_binary(self()).
+ Bin = <<131, 88, 100, 0, 3, 65, 64, 49, 0, 0, 0, 89, 0, 0, 0, 0, 99, 137, 147, 218>>,
+ Pid = binary_to_term(Bin),
+ ?assertEqual('A@1', node(Pid)),
+ Pid.
+
mock_fail_req(Path, Return) ->
meck:expect(
ibrowse,
@@ -216,6 +246,12 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
end.
replicate(Source, Target) ->
+ Rep = make_rep(Source, Target),
+ ok = couch_replicator_scheduler:add_job(Rep),
+ couch_replicator_scheduler:reschedule(),
+ {ok, Rep#rep.id}.
+
+make_rep(Source, Target) ->
RepObject =
{[
{<<"source">>, url(Source)},
@@ -227,9 +263,7 @@ replicate(Source, Target) ->
{<<"connection_timeout">>, 3000}
]},
{ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
- ok = couch_replicator_scheduler:add_job(Rep),
- couch_replicator_scheduler:reschedule(),
- {ok, Rep#rep.id}.
+ Rep.
url(DbName) ->
couch_replicator_test_helper:cluster_db_url(DbName).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
index f862527f4..e28b4a28b 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
@@ -160,7 +160,7 @@ cluster_db_url(Path) ->
<<(cluster_url())/binary, "/", Path/binary>>.
get_pid(RepId) ->
- Pid = global:whereis_name({couch_replicator_scheduler_job, RepId}),
+ [Pid] = couch_replicator_pg:pids(RepId),
?assert(is_pid(Pid)),
Pid.