summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-08-13 16:53:44 -0400
committerNick Vatamaniuc <vatamane@apache.org>2019-08-14 10:08:11 -0400
commitf2bc7ee05c2bc06dd8e67a05aec73d314a168ad2 (patch)
tree78c5172baeb6428f215c5ebc55a10407460f1b5c
parent07600f73d117417d5af26e4c2fb6ebc31f09e570 (diff)
downloadcouchdb-fix-replicator-reschedule.tar.gz
Fix replication rescheduling Running < MaxJobs corner casefix-replicator-reschedule
Previously, when total number of replication jobs exceed `MaxJobs`, if some jobs crashed, additional jobs didn't start immediately to bring the running total up to the `MaxJobs` limit. Then, during rescheduling, the `Running == MaxJobs, Pending > 0` guard would fail and jobs would not rotate. In other words, if at least one job crashed, rotation didn't happen. The fix is to simplify the rotation logic to handle the `Running < MaxJobs` case. First, up to `Churn` number of jobs are stopped, then enough jobs are started to reach the `MaxJobs` limit. The rotation logic case handles the `start_pending_jobs/3` case so there is no need to call that separately before rotation happens.
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl82
1 files changed, 49 insertions, 33 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index e3dbede83..7fe417a53 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -494,7 +494,10 @@ start_jobs(Count, State) ->
-spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer().
-stop_jobs(Count, IsContinuous, State) ->
+stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 ->
+ 0;
+
+stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
Running0 = running_jobs(),
ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
Running1 = lists:filter(ContinuousPred, Running0),
@@ -723,35 +726,25 @@ reset_job_process(#job{} = Job) ->
-spec reschedule(#state{}) -> ok.
reschedule(State) ->
- Running = running_job_count(),
- Pending = pending_job_count(),
- stop_excess_jobs(State, Running),
- start_pending_jobs(State, Running, Pending),
- rotate_jobs(State, Running, Pending),
- update_running_jobs_stats(State#state.stats_pid),
- ok.
+ StopCount = stop_excess_jobs(State, running_job_count()),
+ rotate_jobs(State, StopCount),
+ update_running_jobs_stats(State#state.stats_pid).
--spec stop_excess_jobs(#state{}, non_neg_integer()) -> ok.
+-spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer().
stop_excess_jobs(State, Running) ->
#state{max_jobs=MaxJobs} = State,
- StopCount = Running - MaxJobs,
- if StopCount =< 0 -> ok; true ->
- Stopped = stop_jobs(StopCount, true, State),
- OneshotLeft = StopCount - Stopped,
- if OneshotLeft =< 0 -> ok; true ->
- stop_jobs(OneshotLeft, false, State),
- ok
- end
- end.
+ StopCount = max(0, Running - MaxJobs),
+ Stopped = stop_jobs(StopCount, true, State),
+ OneshotLeft = StopCount - Stopped,
+ stop_jobs(OneshotLeft, false, State),
+ StopCount.
start_pending_jobs(State) ->
- start_pending_jobs(State, running_job_count(), pending_job_count()).
-
-
-start_pending_jobs(State, Running, Pending) ->
#state{max_jobs=MaxJobs} = State,
+ Running = running_job_count(),
+ Pending = pending_job_count(),
if Running < MaxJobs, Pending > 0 ->
start_jobs(MaxJobs - Running, State);
true ->
@@ -759,13 +752,19 @@ start_pending_jobs(State, Running, Pending) ->
end.
--spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok.
-rotate_jobs(State, Running, Pending) ->
+-spec rotate_jobs(#state{}, non_neg_integer()) -> ok.
+rotate_jobs(State, ChurnSoFar) ->
#state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
- if Running == MaxJobs, Pending > 0 ->
- RotateCount = lists:min([Pending, Running, MaxChurn]),
- StopCount = stop_jobs(RotateCount, true, State),
- start_jobs(StopCount, State);
+ Running = running_job_count(),
+ Pending = pending_job_count(),
+ % Reduce MaxChurn by the number of already stopped jobs in the
+ % current rescheduling cycle.
+ Churn = max(0, MaxChurn - ChurnSoFar),
+ if Running =< MaxJobs ->
+ StopCount = lists:min([Pending, Running, Churn]),
+ stop_jobs(StopCount, true, State),
+ StartCount = max(0, MaxJobs - running_job_count()),
+ start_jobs(StartCount, State);
true ->
ok
end.
@@ -1047,6 +1046,7 @@ scheduler_test_() ->
t_excess_prefer_continuous_first(),
t_stop_oldest_first(),
t_start_oldest_first(),
+ t_jobs_churn_even_if_not_all_max_jobs_are_running(),
t_dont_stop_if_nothing_pending(),
t_max_churn_limits_number_of_rotated_jobs(),
t_existing_jobs(),
@@ -1056,7 +1056,7 @@ scheduler_test_() ->
t_rotate_continuous_only_if_mixed(),
t_oneshot_dont_get_starting_priority(),
t_oneshot_will_hog_the_scheduler(),
- t_if_excess_is_trimmed_rotation_doesnt_happen(),
+ t_if_excess_is_trimmed_rotation_still_happens(),
t_if_transient_job_crashes_it_gets_removed(),
t_if_permanent_job_crashes_it_stays_in_ets(),
t_job_summary_running(),
@@ -1177,10 +1177,10 @@ t_stop_oldest_first() ->
continuous_running(5)
],
setup_jobs(Jobs),
- reschedule(mock_state(2)),
+ reschedule(mock_state(2, 1)),
?assertEqual({2, 1}, run_stop_count()),
?assertEqual([4], jobs_stopped()),
- reschedule(mock_state(1)),
+ reschedule(mock_state(1, 1)),
?assertEqual([7], jobs_running())
end).
@@ -1192,6 +1192,22 @@ t_start_oldest_first() ->
?assertEqual({1, 2}, run_stop_count()),
?assertEqual([2], jobs_running()),
reschedule(mock_state(2)),
+ ?assertEqual({2, 1}, run_stop_count()),
+ % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should
+ % be running.
+ ?assertEqual([2], jobs_stopped())
+ end).
+
+
+t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
+ ?_test(begin
+ setup_jobs([
+ continuous_running(7),
+ continuous(2),
+ continuous(5)
+ ]),
+ reschedule(mock_state(2, 2)),
+ ?assertEqual({2, 1}, run_stop_count()),
?assertEqual([7], jobs_stopped())
end).
@@ -1289,7 +1305,7 @@ t_oneshot_will_hog_the_scheduler() ->
end).
-t_if_excess_is_trimmed_rotation_doesnt_happen() ->
+t_if_excess_is_trimmed_rotation_still_happens() ->
?_test(begin
Jobs = [
continuous(1),
@@ -1298,7 +1314,7 @@ t_if_excess_is_trimmed_rotation_doesnt_happen() ->
],
setup_jobs(Jobs),
reschedule(mock_state(1)),
- ?assertEqual([3], jobs_running())
+ ?assertEqual([1], jobs_running())
end).