summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl82
1 files changed, 49 insertions, 33 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index e3dbede83..7fe417a53 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -494,7 +494,10 @@ start_jobs(Count, State) ->
-spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer().
-stop_jobs(Count, IsContinuous, State) ->
+stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 ->
+ 0;
+
+stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
Running0 = running_jobs(),
ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
Running1 = lists:filter(ContinuousPred, Running0),
@@ -723,35 +726,25 @@ reset_job_process(#job{} = Job) ->
-spec reschedule(#state{}) -> ok.
reschedule(State) ->
- Running = running_job_count(),
- Pending = pending_job_count(),
- stop_excess_jobs(State, Running),
- start_pending_jobs(State, Running, Pending),
- rotate_jobs(State, Running, Pending),
- update_running_jobs_stats(State#state.stats_pid),
- ok.
+ StopCount = stop_excess_jobs(State, running_job_count()),
+ rotate_jobs(State, StopCount),
+ update_running_jobs_stats(State#state.stats_pid).
--spec stop_excess_jobs(#state{}, non_neg_integer()) -> ok.
+-spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer().
stop_excess_jobs(State, Running) ->
#state{max_jobs=MaxJobs} = State,
- StopCount = Running - MaxJobs,
- if StopCount =< 0 -> ok; true ->
- Stopped = stop_jobs(StopCount, true, State),
- OneshotLeft = StopCount - Stopped,
- if OneshotLeft =< 0 -> ok; true ->
- stop_jobs(OneshotLeft, false, State),
- ok
- end
- end.
+ StopCount = max(0, Running - MaxJobs),
+ Stopped = stop_jobs(StopCount, true, State),
+ OneshotLeft = StopCount - Stopped,
+ stop_jobs(OneshotLeft, false, State),
+ StopCount.
start_pending_jobs(State) ->
- start_pending_jobs(State, running_job_count(), pending_job_count()).
-
-
-start_pending_jobs(State, Running, Pending) ->
#state{max_jobs=MaxJobs} = State,
+ Running = running_job_count(),
+ Pending = pending_job_count(),
if Running < MaxJobs, Pending > 0 ->
start_jobs(MaxJobs - Running, State);
true ->
@@ -759,13 +752,19 @@ start_pending_jobs(State, Running, Pending) ->
end.
--spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok.
-rotate_jobs(State, Running, Pending) ->
+-spec rotate_jobs(#state{}, non_neg_integer()) -> ok.
+rotate_jobs(State, ChurnSoFar) ->
#state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
- if Running == MaxJobs, Pending > 0 ->
- RotateCount = lists:min([Pending, Running, MaxChurn]),
- StopCount = stop_jobs(RotateCount, true, State),
- start_jobs(StopCount, State);
+ Running = running_job_count(),
+ Pending = pending_job_count(),
+ % Reduce MaxChurn by the number of already stopped jobs in the
+ % current rescheduling cycle.
+ Churn = max(0, MaxChurn - ChurnSoFar),
+ if Running =< MaxJobs ->
+ StopCount = lists:min([Pending, Running, Churn]),
+ stop_jobs(StopCount, true, State),
+ StartCount = max(0, MaxJobs - running_job_count()),
+ start_jobs(StartCount, State);
true ->
ok
end.
@@ -1047,6 +1046,7 @@ scheduler_test_() ->
t_excess_prefer_continuous_first(),
t_stop_oldest_first(),
t_start_oldest_first(),
+ t_jobs_churn_even_if_not_all_max_jobs_are_running(),
t_dont_stop_if_nothing_pending(),
t_max_churn_limits_number_of_rotated_jobs(),
t_existing_jobs(),
@@ -1056,7 +1056,7 @@ scheduler_test_() ->
t_rotate_continuous_only_if_mixed(),
t_oneshot_dont_get_starting_priority(),
t_oneshot_will_hog_the_scheduler(),
- t_if_excess_is_trimmed_rotation_doesnt_happen(),
+ t_if_excess_is_trimmed_rotation_still_happens(),
t_if_transient_job_crashes_it_gets_removed(),
t_if_permanent_job_crashes_it_stays_in_ets(),
t_job_summary_running(),
@@ -1177,10 +1177,10 @@ t_stop_oldest_first() ->
continuous_running(5)
],
setup_jobs(Jobs),
- reschedule(mock_state(2)),
+ reschedule(mock_state(2, 1)),
?assertEqual({2, 1}, run_stop_count()),
?assertEqual([4], jobs_stopped()),
- reschedule(mock_state(1)),
+ reschedule(mock_state(1, 1)),
?assertEqual([7], jobs_running())
end).
@@ -1192,6 +1192,22 @@ t_start_oldest_first() ->
?assertEqual({1, 2}, run_stop_count()),
?assertEqual([2], jobs_running()),
reschedule(mock_state(2)),
+ ?assertEqual({2, 1}, run_stop_count()),
+ % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should
+ % be running.
+ ?assertEqual([2], jobs_stopped())
+ end).
+
+
+t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
+ ?_test(begin
+ setup_jobs([
+ continuous_running(7),
+ continuous(2),
+ continuous(5)
+ ]),
+ reschedule(mock_state(2, 2)),
+ ?assertEqual({2, 1}, run_stop_count()),
?assertEqual([7], jobs_stopped())
end).
@@ -1289,7 +1305,7 @@ t_oneshot_will_hog_the_scheduler() ->
end).
-t_if_excess_is_trimmed_rotation_doesnt_happen() ->
+t_if_excess_is_trimmed_rotation_still_happens() ->
?_test(begin
Jobs = [
continuous(1),
@@ -1298,7 +1314,7 @@ t_if_excess_is_trimmed_rotation_doesnt_happen() ->
],
setup_jobs(Jobs),
reschedule(mock_state(1)),
- ?assertEqual([3], jobs_running())
+ ?assertEqual([1], jobs_running())
end).