diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-12-01 15:02:24 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-12-02 11:53:21 -0500 |
commit | 4530f8e1f0613bcb2118aef157f0151507866ddd (patch) | |
tree | 5c19498f07cfcd6a0bd7db917e465a1f9c012c61 /src | |
parent | f13ceb46ce3c120e1960fa47bfda0a606601900e (diff) | |
download | couchdb-4530f8e1f0613bcb2118aef157f0151507866ddd.tar.gz |
Remove all usage of global
Global has a tendency to create deadlocks [1], and when that happens
replication jobs can't start on any of the nodes in the cluster.
We don't really need strict consistent locking for replication jobs. It's
mostly to avoid replication jobs thrashing the same checkpoint docs back and
forth between different session IDs. So, remove global to avoid any issues
around it, and replace it with `pg` -- the new (Erlang 23+) process group
implementation. (Technically `global` is still running in the runtime system as
it's started by the `kernel` app. We just avoid interacting with it and
registering any names to avoid deadlocks).
In `pg` we start a replication `scope`, and then in that scope make every RepId
a process `group`. When replication processes spawn, their Pids becomes
`members` of that group:
```
couch_replicator_pg (scope):
"a12c+create_target" (RepId group):
[Pid1, Pid2] (members)
...
```
As per `pg` implementation, groups are created and remove automatically as
members are added/removed from it, so we don't have to do anything there.
If there are already any running Pids in the same group, we avoid starting the
jobs, and fail like we did before when we used global. In the even more rare
case of a race condition, when 2+ jobs do manage to start, we do a membership
check before each checkpoint. One of the jobs then stops to yield to another.
For simplicity pick the one running on the lowest lexicographically sorted node
name to survive.
[1] https://github.com/erlang/otp/issues/6524
[2] https://www.erlang.org/doc/man/pg.html
Diffstat (limited to 'src')
6 files changed, 238 insertions, 32 deletions
diff --git a/src/couch_event/src/couch_event_listener.erl b/src/couch_event/src/couch_event_listener.erl index 40f1a5c65..22bf23327 100644 --- a/src/couch_event/src/couch_event_listener.erl +++ b/src/couch_event/src/couch_event_listener.erl @@ -175,14 +175,8 @@ do_terminate(Reason, #st{module = Module, state = State}) -> end, erlang:exit(Status). -where({global, Name}) -> global:whereis_name(Name); where({local, Name}) -> whereis(Name). -name_register({global, Name} = GN) -> - case global:register_name(Name, self()) of - yes -> true; - no -> {false, where(GN)} - end; name_register({local, Name} = LN) -> try register(Name, self()) of true -> true diff --git a/src/couch_replicator/src/couch_replicator_pg.erl b/src/couch_replicator/src/couch_replicator_pg.erl new file mode 100644 index 000000000..25937ec15 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_pg.erl @@ -0,0 +1,153 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% Use pg process groups to reduce the chance of duplicate replication jobs +% running on the same cluster. +% +% A custom replicator pg group is started via start_link/0. Then, replication +% jobs check if they would be leaders before starting. If, by chance, two jobs +% with the same RepId start anyway, then replication jobs would do an extra +% check before each checkpoint. If the they are not leaders any longer, they +% should stop running. The "leader" is just the first sorted element in the +% [node(Pid), ...] list. + +-module(couch_replicator_pg). + +-export([ + start_link/0, + join/2, + leave/2, + pids/1, + should_start/2, + should_run/2 +]). + +% Start a custom pg group. Should be called from the replication supervisor. +% +start_link() -> + pg:start_link(?MODULE). + +% Join a replication job pid to a RepId group +% +join({_, _} = RepId, Pid) when is_pid(Pid) -> + pg:join(?MODULE, id(RepId), Pid). + +% Leave a replication RepId group. This doesn't have to be called explicitly as +% the processes are monitored and automatically removed by pg. It may be nice, +% to call it from terminate/2 to speed things along a bit and clear the group +% quicker. +% +leave({_, _} = RepId, Pid) when is_pid(Pid) -> + pg:leave(?MODULE, id(RepId), Pid). + +% Determine if a replication job should start on a particular node. If it +% should, return `yes`, otherwise return `{no, OtherPid}`. `OtherPid` is +% the pid of the replication job that is already running. +% +should_start({_, _} = RepId, Node) when is_atom(Node) -> + no_other_nodes(Node, pids(RepId)). + +% Determine if the replication job should keep running as the main job for that +% RepId. If it is, return yes, otherwise return `{no, OtherPid}`. `OtherPid` is +% the pid of the replication job that should stay running instead of this one. +% +should_run({_, _} = RepId, Pid) when is_pid(Pid) -> + case pids(RepId) of + [OtherPid | _] when OtherPid =/= Pid -> {no, OtherPid}; + _ -> yes + end. + +% Sort all the pids by node first to get some deterministic order. For all we +% know, pids may already sort that way, but we're just making it explicit here +% in case it somehow changes in the future. +% +pids({_, _} = RepId) -> + NodePids = [{node(P), P} || P <- pg:get_members(?MODULE, id(RepId))], + {_, Pids} = lists:unzip(lists:usort(NodePids)), + Pids. + +id({Base, Ext}) -> + iolist_to_binary([Base, Ext]). + +no_other_nodes(_, []) -> + yes; +no_other_nodes(Node, [Pid | _]) when Node =/= node(Pid) -> + {no, Pid}; +no_other_nodes(Node, [Pid | Pids]) when Node =:= node(Pid) -> + no_other_nodes(Node, Pids). + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +couch_replicator_pg_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_start_stop), + ?TDEF_FE(t_join_leave), + ?TDEF_FE(t_should_start), + ?TDEF_FE(t_should_run) + ] + }. + +setup() -> + {ok, PGPid} = start_link(), + PGPid. + +teardown(PGPid) when is_pid(PGPid) -> + ?assertEqual(ok, gen_server:stop(PGPid)). + +t_start_stop(PGPid) -> + ?assert(is_process_alive(PGPid)), + ?assertEqual([], pg:which_groups(?MODULE)). + +t_join_leave(_) -> + RepId = {"a", "+b"}, + ?assertEqual([], pids(RepId)), + Pid = self(), + ?assertEqual(ok, join(RepId, Pid)), + ?assertEqual([id(RepId)], pg:which_groups(?MODULE)), + ?assertEqual([Pid], pids(RepId)), + ?assertEqual(ok, leave(RepId, Pid)), + ?assertEqual(not_joined, leave(RepId, Pid)), + ?assertEqual([], pids(RepId)), + ?assertEqual([], pg:which_groups(?MODULE)). + +t_should_start(_) -> + RepId = {"a", "+b"}, + ?assertEqual(yes, should_start(RepId, node())), + ?assertEqual(yes, should_start(RepId, 'foo@bar.bogus.net')), + Pid = self(), + ok = join(RepId, Pid), + % On the same node we let it start, it will blow up anyway in the + % supervisor. + ?assertEqual(yes, should_start(RepId, node())), + ?assertEqual({no, Pid}, should_start(RepId, 'foo@bar.bogus42.net')). + +t_should_run(_) -> + RepId = {"a", "+b"}, + Pid = self(), + % This is odd case, somehow a job asks if it should run but it hasn't + % registered. We just choose to let it run. + ?assertEqual(yes, should_run(RepId, Pid)), + ok = join(RepId, Pid), + % The only job registered is itself + ?assertEqual(yes, should_run(RepId, Pid)), + % Let's add <0.0.0> init so it can sort lower + InitPid = whereis(init), + ok = join(RepId, InitPid), + ?assertEqual({no, InitPid}, should_run(RepId, Pid)). + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 1d3e70c5a..416220efd 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -81,21 +81,25 @@ view = nil }). -start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> +start_link(#rep{id = Id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> RepChildId = BaseId ++ Ext, Source = couch_replicator_api_wrap:db_uri(Src), Target = couch_replicator_api_wrap:db_uri(Tgt), - ServerName = {global, {?MODULE, Rep#rep.id}}, - - case gen_server:start_link(ServerName, ?MODULE, Rep, []) of - {ok, Pid} -> - {ok, Pid}; - {error, Reason} -> - couch_log:warning( - "failed to start replication `~s` (`~s` -> `~s`)", - [RepChildId, Source, Target] - ), - {error, Reason} + case couch_replicator_pg:should_start(Id, node()) of + yes -> + case gen_server:start_link({local, ?MODULE}, ?MODULE, Rep, []) of + {ok, Pid} -> + couch_replicator_pg:join(Id, Pid), + {ok, Pid}; + {error, Reason} -> + couch_log:warning( + "failed to start replication `~s` (`~s` -> `~s`)", + [RepChildId, Source, Target] + ), + {error, Reason} + end; + {no, OtherPid} -> + {error, {already_started, OtherPid}} end. init(InitArgs) -> @@ -234,14 +238,19 @@ handle_call({report_seq_done, Seq, StatsInc}, From, State) -> handle_cast({sum_stats, Stats}, State) -> NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats), {noreply, State#rep_state{stats = NewStats}}; -handle_cast(checkpoint, State) -> - case do_checkpoint(State) of - {ok, NewState} -> - couch_stats:increment_counter([couch_replicator, checkpoints, success]), - {noreply, NewState#rep_state{timer = start_timer(State)}}; - Error -> - couch_stats:increment_counter([couch_replicator, checkpoints, failure]), - {stop, Error, State} +handle_cast(checkpoint, #rep_state{rep_details = Rep} = State) -> + case couch_replicator_pg:should_run(Rep#rep.id, self()) of + yes -> + case do_checkpoint(State) of + {ok, NewState} -> + couch_stats:increment_counter([couch_replicator, checkpoints, success]), + {noreply, NewState#rep_state{timer = start_timer(State)}}; + Error -> + couch_stats:increment_counter([couch_replicator, checkpoints, failure]), + {stop, Error, State} + end; + {no, OtherPid} -> + {stop, {shutdown, {duplicate_job, OtherPid}}, State} end; handle_cast( {report_seq, Seq}, @@ -367,6 +376,7 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> terminate_cleanup(State1); terminate({shutdown, max_backoff}, {error, InitArgs}) -> #rep{id = {BaseId, Ext} = RepId} = InitArgs, + couch_replicator_pg:leave(RepId, self()), couch_stats:increment_counter([couch_replicator, failed_starts]), couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]), couch_replicator_notifier:notify({error, RepId, max_backoff}); @@ -378,6 +388,7 @@ terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) -> doc_id = DocId, db_name = DbName } = InitArgs, + couch_replicator_pg:leave(RepId, self()), Source = couch_replicator_api_wrap:db_uri(Source0), Target = couch_replicator_api_wrap:db_uri(Target0), RepIdStr = BaseId ++ Ext, @@ -406,6 +417,18 @@ terminate({shutdown, max_backoff}, State) -> ), terminate_cleanup(State), couch_replicator_notifier:notify({error, RepId, max_backoff}); +terminate({shutdown, {duplicate_job, OtherPid}}, State) -> + #rep_state{ + source_name = Source, + target_name = Target, + rep_details = #rep{id = {BaseId, Ext} = RepId} + } = State, + couch_log:error( + "Replication `~s` (`~s` -> `~s`) with pid ~p was usurped by ~p on node ~p", + [BaseId ++ Ext, Source, Target, self(), OtherPid, node(OtherPid)] + ), + terminate_cleanup(State), + couch_replicator_notifier:notify({error, RepId, duplicate_job}); terminate({shutdown, Reason}, State) -> % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple % wrapped around the message @@ -423,7 +446,8 @@ terminate(Reason, State) -> terminate_cleanup(State), couch_replicator_notifier:notify({error, RepId, Reason}). -terminate_cleanup(State) -> +terminate_cleanup(#rep_state{rep_details = #rep{id = RepId}} = State) -> + couch_replicator_pg:leave(RepId, self()), update_task(State), couch_replicator_api_wrap:db_close(State#rep_state.source), couch_replicator_api_wrap:db_close(State#rep_state.target). diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl index 33eee8659..1cf973f71 100644 --- a/src/couch_replicator/src/couch_replicator_sup.erl +++ b/src/couch_replicator/src/couch_replicator_sup.erl @@ -22,6 +22,7 @@ init(_Args) -> Children = [ {couch_replication_event, {gen_event, start_link, [{local, couch_replication}]}, permanent, brutal_kill, worker, dynamic}, + {couch_replicator_pg, {couch_replicator_pg, start_link, []}, permanent, 1000, worker, [pg]}, {couch_replicator_clustering, {couch_replicator_clustering, start_link, []}, permanent, brutal_kill, worker, [couch_replicator_clustering]}, {couch_replicator_connection, {couch_replicator_connection, start_link, []}, permanent, diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl index 7ba6bc69d..dd6609941 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -28,7 +28,9 @@ error_reporting_test_() -> ?TDEF_FE(t_fail_bulk_get, 15), ?TDEF_FE(t_fail_changes_queue), ?TDEF_FE(t_fail_changes_manager), - ?TDEF_FE(t_fail_changes_reader_proc) + ?TDEF_FE(t_fail_changes_reader_proc), + ?TDEF_FE(t_dont_start_duplicate_job), + ?TDEF_FE(t_stop_duplicate_job) ] }. @@ -150,6 +152,34 @@ t_fail_changes_reader_proc({_Ctx, {Source, Target}}) -> ?assertEqual({changes_reader_died, kapow}, Result), couch_replicator_notifier:stop(Listener). +t_dont_start_duplicate_job({_Ctx, {Source, Target}}) -> + meck:new(couch_replicator_pg, [passthrough]), + Pid = pid_from_another_node(), + meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end), + Rep = make_rep(Source, Target), + ExpectErr = {error, {already_started, Pid}}, + ?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)). + +t_stop_duplicate_job({_Ctx, {Source, Target}}) -> + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + RepPid = couch_replicator_test_helper:get_pid(RepId), + {ok, Listener} = rep_result_listener(RepId), + Pid = pid_from_another_node(), + meck:expect(couch_replicator_pg, should_run, fun(_, _) -> {no, Pid} end), + RepPid ! {'$gen_cast', checkpoint}, + {error, Result} = wait_rep_result(RepId), + ?assertEqual(duplicate_job, Result), + couch_replicator_notifier:stop(Listener). + +pid_from_another_node() -> + % Use a Pid serialized from a node named A@1 + % (A@1)1> term_to_binary(self()). + Bin = <<131, 88, 100, 0, 3, 65, 64, 49, 0, 0, 0, 89, 0, 0, 0, 0, 99, 137, 147, 218>>, + Pid = binary_to_term(Bin), + ?assertEqual('A@1', node(Pid)), + Pid. + mock_fail_req(Path, Return) -> meck:expect( ibrowse, @@ -216,6 +246,12 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> end. replicate(Source, Target) -> + Rep = make_rep(Source, Target), + ok = couch_replicator_scheduler:add_job(Rep), + couch_replicator_scheduler:reschedule(), + {ok, Rep#rep.id}. + +make_rep(Source, Target) -> RepObject = {[ {<<"source">>, url(Source)}, @@ -227,9 +263,7 @@ replicate(Source, Target) -> {<<"connection_timeout">>, 3000} ]}, {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER), - ok = couch_replicator_scheduler:add_job(Rep), - couch_replicator_scheduler:reschedule(), - {ok, Rep#rep.id}. + Rep. url(DbName) -> couch_replicator_test_helper:cluster_db_url(DbName). diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl index f862527f4..e28b4a28b 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl @@ -160,7 +160,7 @@ cluster_db_url(Path) -> <<(cluster_url())/binary, "/", Path/binary>>. get_pid(RepId) -> - Pid = global:whereis_name({couch_replicator_scheduler_job, RepId}), + [Pid] = couch_replicator_pg:pids(RepId), ?assert(is_pid(Pid)), Pid. |