summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2021-02-15 22:35:32 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2021-03-11 13:13:03 -0500
commitab38c2dd3f429ce8f347fa4f04e2815b810a8a19 (patch)
tree11d35dd32da4713ba178f04dd10bc01d77136b33
parent04e1d64f5f7f951a0dcda399354fbd8cfa934181 (diff)
downloadcouchdb-ab38c2dd3f429ce8f347fa4f04e2815b810a8a19.tar.gz
Fair Share Replication Scheduler Implementation
Fair share replication scheduler allows configuring job priorities per-replicator db. Previously jobs from all the replication dbs would be added to the scheduler and run in a round-robin order. This update makes it possible to specify the relative priority of jobs from different databases. For example, there could be low, high and default priority _replicator dbs. The original algorithm comes from the [A Fair Share Scheduler](https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf "Fair Share Scheduler") paper by Judy Kay and Piers Lauder. A summary of how the algorithm works is included in the top level comment in the couch_replicator_share module. There is minimal modification to the main scheduler logic. Besides the share accounting logic each cycle, the other changes are: * Running and stopping candidates are now picked based on the priority first, and then on their last_started timestamp. * When jobs finish executing mid-cycle, their charges are accounted for. That holds for jobs which terminate normally, are removed by the user, or crash. Other interesting aspects are the interaction with the error back-off mechanism and how one-shot replications are treated: * The exponential error back-off mechanism is unaltered and takes precedence over the priority values. That means unhealthy jobs are rejected and "penalized" before the priority value is even looked at. * One-shot replications, once started, are not stopped during each scheduling cycle unless the operator manually adjusts the `max_jobs` parameter. That behavior is necessary to preserve the "snapshot" semantics and is retained in this update.
-rw-r--r--rel/overlay/etc/default.ini26
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl214
-rw-r--r--src/couch_replicator/src/couch_replicator_share.erl808
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_test.hrl35
4 files changed, 1006 insertions, 77 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 973b1a1fe..6c992d7bd 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -482,6 +482,32 @@ ssl_certificate_max_depth = 3
; or 403 response this setting is not needed.
;session_refresh_interval_sec = 550
+; Usage coefficient decays historic fair share usage every scheduling
+; cycle. The value must be between 0.0 and 1.0. Lower values will
+; ensure historic usage decays quicker and higher values means it will
+; be remembered longer.
+;usage_coeff = 0.5
+
+; Priority coefficient decays all the job priorities such that they slowly
+; drift towards the front of the run queue. This coefficient defines a maximum
+; time window over which this algorithm would operate. For example, if this
+; value is too small (0.1), after a few cycles quite a few jobs would end up at
+; priority 0, and would render this algorithm useless. The default value of
+; 0.98 is picked such that if a job ran for one scheduler cycle, then didn't
+; get to run for 7 hours, it would still have priority > 0. 7 hours was picked
+; as it was close enought to 8 hours which is the default maximum error backoff
+; interval.
+;priority_coeff = 0.98
+
+
+[replicator.shares]
+; Fair share configuration section. More shares result in a higher
+; chance that jobs from that db get to run. The default value is 100,
+; minimum is 1 and maximum is 1000. The configuration may be set even
+; if the database does not exit.
+;_replicator = 100
+
+
[log]
; Possible log levels:
; debug
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index 641443a7c..223cbb834 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -55,15 +55,10 @@
stats_updater_loop/1
]).
--include("couch_replicator_scheduler.hrl").
-include("couch_replicator.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include_lib("couch/include/couch_db.hrl").
-%% types
--type event_type() :: added | started | stopped | {crashed, any()}.
--type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
--type history() :: nonempty_list(event()).
%% definitions
-define(MAX_BACKOFF_EXPONENT, 10).
@@ -78,13 +73,13 @@
-define(DEFAULT_SCHEDULER_INTERVAL, 60000).
--record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
--record(job, {
- id :: job_id() | '$1' | '_',
- rep :: #rep{} | '_',
- pid :: undefined | pid() | '$1' | '_',
- monitor :: undefined | reference() | '_',
- history :: history() | '_'
+-record(state, {
+ interval = ?DEFAULT_SCHEDULER_INTERVAL,
+ timer,
+ max_jobs,
+ max_churn,
+ max_history,
+ stats_pid
}).
-record(stats_acc, {
@@ -229,6 +224,7 @@ init(_) ->
EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true},
{write_concurrency, true}],
?MODULE = ets:new(?MODULE, EtsOpts),
+ ok = couch_replicator_share:init(),
ok = config:listen_for_changes(?MODULE, nil),
Interval = config:get_integer("replicator", "interval",
?DEFAULT_SCHEDULER_INTERVAL),
@@ -290,6 +286,17 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval),
couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
{noreply, State#state{interval = Interval}};
+handle_cast({update_shares, Key, Shares}, State) when is_binary(Key),
+ is_integer(Shares), Shares >= 0 ->
+ couch_log:notice("~p: shares for ~s set to ~B", [?MODULE, Key, Shares]),
+ couch_replicator_share:update_shares(Key, Shares),
+ {noreply, State};
+
+handle_cast({reset_shares, Key}, State) when is_binary(Key) ->
+ couch_log:notice("~p: shares for ~s reset to default", [?MODULE, Key]),
+ couch_replicator_share:reset_shares(Key),
+ {noreply, State};
+
handle_cast({update_job_stats, JobId, Stats}, State) ->
case rep_state(JobId) of
nil ->
@@ -314,6 +321,8 @@ handle_info(reschedule, State) ->
handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
{ok, Job} = job_by_pid(Pid),
couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
+ Interval = State#state.interval,
+ couch_replicator_share:charge(Job, Interval, os:timestamp()),
remove_job_int(Job),
update_running_jobs_stats(State#state.stats_pid),
{noreply, State};
@@ -324,6 +333,8 @@ handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
{shutdown, ShutdownReason} -> ShutdownReason;
Other -> Other
end,
+ Interval = State#state.interval,
+ couch_replicator_share:charge(Job, Interval, os:timestamp()),
ok = handle_crashed_job(Job, Reason, State),
{noreply, State};
@@ -340,6 +351,7 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, _State) ->
+ couch_replicator_share:clear(),
ok.
@@ -369,6 +381,15 @@ handle_config_change("replicator", "max_history", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}),
{ok, S};
+handle_config_change("replicator.shares", Key, deleted, _, S) ->
+ ok = gen_server:cast(?MODULE, {reset_shares, list_to_binary(Key)}),
+ {ok, S};
+
+handle_config_change("replicator.shares", Key, V, _, S) ->
+ ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key),
+ list_to_integer(V)}),
+ {ok, S};
+
handle_config_change(_, _, _, _, S) ->
{ok, S}.
@@ -449,19 +470,19 @@ pending_jobs(0) ->
[];
pending_jobs(Count) when is_integer(Count), Count > 0 ->
- Set0 = gb_sets:new(), % [{LastStart, Job},...]
+ Set0 = gb_sets:new(), % [{{Priority, LastStart}, Job},...]
Now = os:timestamp(),
Acc0 = {Set0, Now, Count, health_threshold()},
{Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE),
- [Job || {_Started, Job} <- gb_sets:to_list(Set1)].
+ [Job || {_PriorityKey, Job} <- gb_sets:to_list(Set1)].
pending_fold(#job{pid = Pid}, Acc) when is_pid(Pid) ->
Acc;
pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->
- Set1 = case {not_recently_crashed(Job, Now, HealthThreshold),
- gb_sets:size(Set) >= Count} of
+ Healthy = not_recently_crashed(Job, Now, HealthThreshold),
+ Set1 = case {Healthy, gb_sets:size(Set) >= Count} of
{true, true} ->
% Job is healthy but already reached accumulated limit, so might
% have to replace one of the accumulated jobs
@@ -469,7 +490,7 @@ pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->
{true, false} ->
% Job is healthy and we haven't reached the limit, so add job
% to accumulator
- gb_sets:add_element({last_started(Job), Job}, Set);
+ gb_sets:add_element({start_priority_key(Job), Job}, Set);
{false, _} ->
% This job is not healthy (has crashed too recently), so skip it.
Set
@@ -477,24 +498,34 @@ pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->
{Set1, Now, Count, HealthThreshold}.
-% Replace Job in the accumulator if it is older than youngest job there.
-% "oldest" here means one which has been waiting to run the longest. "youngest"
-% means the one with most recent activity. The goal is to keep up to Count
-% oldest jobs during iteration. For example if there are jobs with these times
-% accumulated so far [5, 7, 11], and start time of current job is 6. Then
-% 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in
-% [5, 6, 7]. In the end the result might look like [1, 2, 5], for example.
+% Replace Job in the accumulator if it has a higher priority (lower priority
+% value) than the lowest priority there. Job priority is indexed by
+% {FairSharePiority, LastStarted} tuples. If the FairSharePriority is the same
+% then last started timestamp is used to pick. The goal is to keep up to Count
+% oldest jobs during the iteration. For example, if there are jobs with these
+% priorities accumulated so far [5, 7, 11], and the priority of current job is
+% 6. Then 6 < 11 is true, so 11 (lower priority) is dropped and 6 is inserted
+% resulting in [5, 6, 7]. In the end the result might look like [1, 2, 5], for
+% example.
+%
pending_maybe_replace(Job, Set) ->
- Started = last_started(Job),
- {Youngest, YoungestJob} = gb_sets:largest(Set),
- case Started < Youngest of
+ Key = start_priority_key(Job),
+ {LowestPKey, LowestPJob} = gb_sets:largest(Set),
+ case Key < LowestPKey of
true ->
- Set1 = gb_sets:delete({Youngest, YoungestJob}, Set),
- gb_sets:add_element({Started, Job}, Set1);
+ Set1 = gb_sets:delete({LowestPKey, LowestPJob}, Set),
+ gb_sets:add_element({Key, Job}, Set1);
false ->
Set
end.
+% Starting priority key is used to order pending jobs such that the ones with a
+% lower priority value and start time would sort first, so they would be the
+% first to run.
+%
+start_priority_key(#job{} = Job) ->
+ {couch_replicator_share:priority(Job#job.id), last_started(Job)}.
+
start_jobs(Count, State) ->
[start_job_int(Job, State) || Job <- pending_jobs(Count)],
@@ -509,13 +540,18 @@ stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
Running0 = running_jobs(),
ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
Running1 = lists:filter(ContinuousPred, Running0),
- Running2 = lists:sort(fun longest_running/2, Running1),
- Running3 = lists:sublist(Running2, Count),
- length([stop_job_int(Job, State) || Job <- Running3]).
+ Running2 = [{stop_priority_key(Job), Job} || Job <- Running1],
+ Running3 = lists:sublist(lists:sort(Running2), Count),
+ length([stop_job_int(Job, State) || {_SortKey, Job} <- Running3]).
-longest_running(#job{} = A, #job{} = B) ->
- last_started(A) =< last_started(B).
+% Lower priority jobs have higher priority values, so we negate them, that way
+% when sorted, they'll come up first. If priorities are equal, jobs are sorted
+% by the lowest starting times as jobs with lowest start time have been running
+% the longest.
+%
+stop_priority_key(#job{} = Job) ->
+ {-couch_replicator_share:priority(Job#job.id), last_started(Job)}.
not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
@@ -593,6 +629,7 @@ backoff_micros(CrashCount) ->
-spec add_job_int(#job{}) -> boolean().
add_job_int(#job{} = Job) ->
+ couch_replicator_share:job_added(Job),
ets:insert_new(?MODULE, Job).
@@ -600,6 +637,9 @@ add_job_int(#job{} = Job) ->
maybe_remove_job_int(JobId, State) ->
case job_by_id(JobId) of
{ok, Job} ->
+ Now = os:timestamp(),
+ Interval = State#state.interval,
+ couch_replicator_share:charge(Job, Interval, Now),
ok = stop_job_int(Job, State),
true = remove_job_int(Job),
couch_stats:increment_counter([couch_replicator, jobs, removes]),
@@ -657,6 +697,7 @@ stop_job_int(#job{} = Job, State) ->
-spec remove_job_int(#job{}) -> true.
remove_job_int(#job{} = Job) ->
+ couch_replicator_share:job_removed(Job),
ets:delete(?MODULE, Job#job.id).
@@ -733,7 +774,8 @@ reset_job_process(#job{} = Job) ->
-spec reschedule(#state{}) -> ok.
-reschedule(State) ->
+reschedule(#state{interval = Interval} = State) ->
+ couch_replicator_share:update(running_jobs(), Interval, os:timestamp()),
StopCount = stop_excess_jobs(State, running_job_count()),
rotate_jobs(State, StopCount),
update_running_jobs_stats(State#state.stats_pid).
@@ -1035,7 +1077,8 @@ longest_running_test() ->
J0 = testjob([crashed()]),
J1 = testjob([started(1)]),
J2 = testjob([started(2)]),
- Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end,
+ SortFun = fun(A, B) -> last_started(A) =< last_started(B) end,
+ Sort = fun(Jobs) -> lists:sort(SortFun, Jobs) end,
?assertEqual([], Sort([])),
?assertEqual([J1], Sort([J1])),
?assertEqual([J1, J2], Sort([J2, J1])),
@@ -1381,11 +1424,12 @@ t_if_excess_is_trimmed_rotation_still_happens() ->
t_if_transient_job_crashes_it_gets_removed() ->
?_test(begin
Pid = mock_pid(),
+ Rep = continuous_rep(),
Job = #job{
id = job1,
pid = Pid,
history = [added()],
- rep = #rep{db_name = null, options = [{continuous, true}]}
+ rep = Rep#rep{db_name = null}
},
setup_jobs([Job]),
?assertEqual(1, ets:info(?MODULE, size)),
@@ -1399,15 +1443,20 @@ t_if_transient_job_crashes_it_gets_removed() ->
t_if_permanent_job_crashes_it_stays_in_ets() ->
?_test(begin
Pid = mock_pid(),
+ Rep = continuous_rep(),
Job = #job{
id = job1,
pid = Pid,
history = [added()],
- rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]}
+ rep = Rep#rep{db_name = <<"db1">>}
},
setup_jobs([Job]),
?assertEqual(1, ets:info(?MODULE, size)),
- State = #state{max_jobs =1, max_history = 3, stats_pid = self()},
+ State = #state{
+ max_jobs = 1,
+ max_history = 3,
+ stats_pid = self()
+ },
{noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
State),
?assertEqual(1, ets:info(?MODULE, size)),
@@ -1419,21 +1468,11 @@ t_if_permanent_job_crashes_it_stays_in_ets() ->
t_existing_jobs() ->
?_test(begin
- Rep = #rep{
- id = job1,
- db_name = <<"db">>,
- source = <<"s">>,
- target = <<"t">>,
- options = [{continuous, true}]
- },
+ Rep0 = continuous_rep(<<"s">>, <<"t">>),
+ Rep = Rep0#rep{id = job1, db_name = <<"db">>},
setup_jobs([#job{id = Rep#rep.id, rep = Rep}]),
- NewRep = #rep{
- id = Rep#rep.id,
- db_name = <<"db">>,
- source = <<"s">>,
- target = <<"t">>,
- options = [{continuous, true}]
- },
+ NewRep0 = continuous_rep(<<"s">>, <<"t">>),
+ NewRep = NewRep0#rep{id = Rep#rep.id, db_name = <<"db">>},
?assert(existing_replication(NewRep)),
?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})),
?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})),
@@ -1443,15 +1482,12 @@ t_existing_jobs() ->
t_job_summary_running() ->
?_test(begin
+ Rep = rep(<<"s">>, <<"t">>),
Job = #job{
id = job1,
pid = mock_pid(),
history = [added()],
- rep = #rep{
- db_name = <<"db1">>,
- source = <<"s">>,
- target = <<"t">>
- }
+ rep = Rep#rep{db_name = <<"db1">>}
},
setup_jobs([Job]),
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
@@ -1472,7 +1508,7 @@ t_job_summary_pending() ->
id = job1,
pid = undefined,
history = [stopped(20), started(10), added()],
- rep = #rep{source = <<"s">>, target = <<"t">>}
+ rep = rep(<<"s">>, <<"t">>)
},
setup_jobs([Job]),
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
@@ -1492,7 +1528,7 @@ t_job_summary_crashing_once() ->
Job = #job{
id = job1,
history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)],
- rep = #rep{source = <<"s">>, target = <<"t">>}
+ rep = rep(<<"s">>, <<"t">>)
},
setup_jobs([Job]),
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
@@ -1508,7 +1544,7 @@ t_job_summary_crashing_many_times() ->
Job = #job{
id = job1,
history = [crashed(4), started(3), crashed(2), started(1)],
- rep = #rep{source = <<"s">>, target = <<"t">>}
+ rep = rep(<<"s">>, <<"t">>)
},
setup_jobs([Job]),
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
@@ -1521,19 +1557,18 @@ t_job_summary_crashing_many_times() ->
t_job_summary_proxy_fields() ->
?_test(begin
+ Src = #httpdb{
+ url = "https://s",
+ proxy_url = "http://u:p@sproxy:12"
+ },
+ Tgt = #httpdb{
+ url = "http://t",
+ proxy_url = "socks5://u:p@tproxy:34"
+ },
Job = #job{
id = job1,
history = [started(10), added()],
- rep = #rep{
- source = #httpdb{
- url = "https://s",
- proxy_url = "http://u:p@sproxy:12"
- },
- target = #httpdb{
- url = "http://t",
- proxy_url = "socks5://u:p@tproxy:34"
- }
- }
+ rep = rep(Src, Tgt)
},
setup_jobs([Job]),
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
@@ -1548,6 +1583,8 @@ t_job_summary_proxy_fields() ->
setup_all() ->
catch ets:delete(?MODULE),
+ meck:expect(config, get, 1, []),
+ meck:expect(config, get, 2, undefined),
meck:expect(couch_log, notice, 2, ok),
meck:expect(couch_log, warning, 2, ok),
meck:expect(couch_log, error, 2, ok),
@@ -1555,10 +1592,13 @@ setup_all() ->
meck:expect(couch_stats, increment_counter, 1, ok),
meck:expect(couch_stats, update_gauge, 2, ok),
Pid = mock_pid(),
- meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}).
+ meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}),
+ couch_replicator_share:init().
+
teardown_all(_) ->
+ couch_replicator_share:clear(),
catch ets:delete(?MODULE),
meck:unload().
@@ -1567,7 +1607,8 @@ setup() ->
meck:reset([
couch_log,
couch_replicator_scheduler_sup,
- couch_stats
+ couch_stats,
+ config
]).
@@ -1621,13 +1662,32 @@ mock_state(MaxJobs, MaxChurn) ->
}.
+rep() ->
+ #rep{options = [], user_ctx = #user_ctx{}}.
+
+
+rep(Src, Tgt) ->
+ Rep = rep(),
+ Rep#rep{source = Src, target = Tgt}.
+
+
+continuous_rep() ->
+ #rep{options = [{continuous, true}], user_ctx = #user_ctx{}}.
+
+
+continuous_rep(Src, Tgt) ->
+ Rep = continuous_rep(),
+ Rep#rep{source = Src, target = Tgt}.
+
+
+
continuous(Id) when is_integer(Id) ->
Started = Id,
Hist = [stopped(Started+1), started(Started), added()],
#job{
id = Id,
history = Hist,
- rep = #rep{options = [{continuous, true}]}
+ rep = continuous_rep()
}.
@@ -1637,7 +1697,7 @@ continuous_running(Id) when is_integer(Id) ->
#job{
id = Id,
history = [started(Started), added()],
- rep = #rep{options = [{continuous, true}]},
+ rep = continuous_rep(),
pid = Pid,
monitor = monitor(process, Pid)
}.
@@ -1646,7 +1706,7 @@ continuous_running(Id) when is_integer(Id) ->
oneshot(Id) when is_integer(Id) ->
Started = Id,
Hist = [stopped(Started + 1), started(Started), added()],
- #job{id = Id, history = Hist, rep = #rep{options = []}}.
+ #job{id = Id, history = Hist, rep = rep()}.
oneshot_running(Id) when is_integer(Id) ->
@@ -1655,7 +1715,7 @@ oneshot_running(Id) when is_integer(Id) ->
#job{
id = Id,
history = [started(Started), added()],
- rep = #rep{options = []},
+ rep = rep(),
pid = Pid,
monitor = monitor(process, Pid)
}.
diff --git a/src/couch_replicator/src/couch_replicator_share.erl b/src/couch_replicator/src/couch_replicator_share.erl
new file mode 100644
index 000000000..1f5d00cf3
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_share.erl
@@ -0,0 +1,808 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% This module implements the "Fair Share" algorithm by Judy Kay and Piers
+% Lauder [1] and applies it to the scheduling of replication jobs.
+%
+% The main idea is _replicator dbs can have a configurable number of "shares"
+% assigned to them. Shares is an abstract quantity from 1 to 1000. The default
+% is 100. Jobs from _replicator databases with more shares get proportionally a
+% higher chance to run than those from databases with a lower number of shares.
+%
+% Every scheduler cycle running jobs are "charged" based on how much time they
+% spent running during that cycle. At the end of the cycle the accumulated
+% charges for each job, the number of shares configured, and the total number
+% of jobs in the pending queue from the same _replicator db, are used to
+% calculate new priority values for all the jobs. To match the algorithm from
+% the paper, jobs with lower priority values are the ones at the front of the
+% run queue and have a higher chance of running.
+%
+% Here is how charges, shares, and number of sibling jobs affect the
+% priority value:
+%
+% 1) Jobs from dbs with higher configured shares get assigned lower
+% priority values and so stay closer to the front of the queue.
+%
+% 2) Jobs from dbs with many other jobs (many siblings) get assigned a
+% higher priority value, so they get pushed further down the queue
+% and have a lower chance of running.
+%
+% 3) Jobs which run longer accumulate more charges and get assigned a
+% higher priority value and get to wait longer to run.
+%
+% In order to prevent job starvation, all job priorities are periodicaly
+% decayed (decreased). This effectively moves all the jobs towards the front of
+% the run queue. So, in effect, there are two competing processes: one
+% uniformly moves all jobs to the front, and the other throws them back in
+% proportion to those factors mentioned above. The speed of this uniform
+% priority decay is controlled by the priority_coeff parameter.
+%
+% In order to prevent jobs from low shares dbs from "cheating" by getting
+% deleted and immediately re-added, charges are accumulated using a
+% historically decayed usage value. The speed of the usage decay is controlled
+% by the `usage_coeff = 0.5` parameter.
+%
+% [1] : https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+
+
+-module(couch_replicator_share).
+
+-export([
+ init/0,
+ clear/0,
+ update_shares/2,
+ reset_shares/1,
+ job_added/1,
+ job_removed/1,
+ update/3,
+ priority/1,
+ charge/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute), then if the job stops running it will take about 26 cycles (minutes)
+% for it to decay to 0 and the system to "forget" about it completely:
+%
+% trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. This coefficient defines a maximum
+% time window over which this algorithm would operate. For example, if this
+% value is too small (0.1), after a few cycles quite a few jobs would end up at
+% priority 0, and would render this algorithm useless. The default value of
+% 0.98 is picked such that if a job ran for one scheduler cycle, then didn't
+% get to run for 7 hours, it would still have priority > 0. 7 hours was picked
+% as it was close enought to 8 hours which is the default maximum error backoff
+% interval.
+%
+% Example calculation:
+% shares = 100
+% usage after 1 minute cycle run = 60000000
+% initial priority = 60000000 / (100 * 100) = 6000
+% trunc(6000 * math:pow(0.98, 431)) = 0
+% 431 / 60 ~= 7 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+ EtsOpts = [named_table, public],
+ ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+ ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+ ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+ ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+ ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+ lists:foreach(fun({K, V}) -> update_shares(K, V) end, get_config_shares()).
+
+
+clear() ->
+ Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+ lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
+
+
+% This should be called when user updates the replicator.shares config section
+%
+update_shares(Key, Shares) when is_integer(Shares) ->
+ ets:insert(?SHARES, {Key, bounded(Shares, ?MIN_SHARES, ?MAX_SHARES)}).
+
+
+% Called when the config value is deleted and shares are reset to the default
+% value.
+reset_shares(Key) ->
+ ets:delete(?SHARES, Key).
+
+
+job_added(#job{} = Job) ->
+ Key = key(Job),
+ % If the entry is not present {Key, 0} is used as the default
+ ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
+ % Update job's priority as if it ran during one scheduler cycle. This is so
+ % new jobs don't get to be at priority 0 (highest).
+ update_priority(Job).
+
+
+job_removed(#job{} = Job) ->
+ Key = key(Job),
+ ets:delete(?PRIORITIES, Job#job.id),
+ case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
+ N when is_integer(N), N =< 0 ->
+ ets:delete(?NUM_JOBS, Key);
+ N when is_integer(N), N > 0 ->
+ ok
+ end,
+ ok.
+
+
+% This is the main algorithm update function. It should be called during each
+% rescheduling cycle with a list of running jobs, the interval from the
+% scheduler (in milliseconds), and the current timestamp.
+%
+% This function does all three main steps as described in [1].
+%
+% 1. Update usage from all the charges in the last scheduling cycle
+%
+% 2. Uniformly decay all job priorities
+%
+% 3. Update priorities for all the running jobs based on usage and number of
+% sibling jobs.
+%
+update(RunningJobs, Interval, {_, _, _} = Now) ->
+ lists:foreach(fun(Job) -> charge(Job, Interval, Now) end, RunningJobs),
+ update_usage(),
+ decay_priorities(),
+ lists:foreach(fun(Job) -> update_priority(Job) end, RunningJobs).
+
+
+priority(JobId) ->
+ % Not found means it was removed because it's value was 0
+ case ets:lookup(?PRIORITIES, JobId) of
+ [{_, Priority}] -> Priority;
+ [] -> 0
+ end.
+
+
+charge(#job{pid = undefined}, _, _) ->
+ 0;
+
+charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) ->
+ Key = key(Job),
+ Charges = job_charges(Job, Interval, Now),
+ % If the entry is not present {Key, 0} is used as the default
+ ets:update_counter(?CHARGES, Key, Charges, {Key, 0}).
+
+
+usage(Key) ->
+ case ets:lookup(?USAGE, Key) of
+ [{_, Usage}] -> Usage;
+ [] -> 0
+ end.
+
+
+num_jobs(Key) ->
+ case ets:lookup(?NUM_JOBS, Key) of
+ [{_, NumJobs}] -> NumJobs;
+ [] -> 0
+ end.
+
+
+shares(Key) ->
+ case ets:lookup(?SHARES, Key) of
+ [{_, Shares}] -> Shares;
+ [] -> ?DEFAULT_SHARES
+ end.
+
+
+% In [1] this described in the "Decay of Process Priorities" section
+%
+decay_priorities() ->
+ decay(?PRIORITIES, priority_coeff()),
+ % If priority becomes 0, it's removed. When looking it up, if it
+ % is missing we assume it is 0
+ clear_zero(?PRIORITIES).
+
+
+% This is the main part of the alrgorithm. In [1] it is described in the
+% "Priority Adjustment" section.
+%
+update_priority(#job{} = Job) ->
+ Id = Job#job.id,
+ Key = key(Job),
+ Shares = shares(Key),
+ Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares),
+ % If the entry is not present {Id, 0} is used as the default
+ ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}).
+
+
+% This is the "User-Level Scheduling" part from [1]
+%
+update_usage() ->
+ decay(?USAGE, usage_coeff()),
+ clear_zero(?USAGE),
+ ets:foldl(fun({Key, Charges}, _) ->
+ % If the entry is not present {Key, 0} is used as the default
+ ets:update_counter(?USAGE, Key, Charges, {Key, 0})
+ end, 0, ?CHARGES),
+ % Start each interval with a fresh charges table
+ ets:delete_all_objects(?CHARGES).
+
+
+% Private helper functions
+
+decay(Ets, Coeff) when is_atom(Ets) ->
+ % Use trunc to ensure the result stays an integer in order for
+ % ets:update_counter to work properly. It throws a badarg otherwise.
+ Head = {'$1', '$2'},
+ Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}},
+ ets:select_replace(Ets, [{Head, [], [Result]}]).
+
+
+clear_zero(Ets) when is_atom(Ets) ->
+ % Coefficents bound to the [0.0, 1.0] range, and the `trunc` call in
+ % decay/2 ensures we can directly match on 0, as opposed needing to do =< 0
+ % with a guard.
+ ets:select_delete(Ets, [{{'_', 0}, [], [true]}]).
+
+
+key(#job{} = Job) ->
+ Rep = Job#job.rep,
+ case is_binary(Rep#rep.db_name) of
+ true -> mem3:dbname(Rep#rep.db_name);
+ false -> (Rep#rep.user_ctx)#user_ctx.name
+ end.
+
+
+% Jobs are charged based on the amount of time the job was running during the
+% last scheduling interval. The time units used are microseconds in order to
+% have a large enough usage values so that when priority is calculated the
+% rounded value won't be rounded off to 0 easily. The formula for the priority
+% calculation is:
+%
+% Priority = (Usage * NumJobs) / Shares^2
+%
+% Then in the worst case of a single job in the db, running only for one
+% second,for one job, with 1000 (max) shares, the priority would be:
+%
+% 1000000 * 1 / (1000^2) = 1
+%
+job_charges(#job{} = Job, IntervalMSec, {_, _, _} = Now) ->
+ TimeRunning = timer:now_diff(Now, last_started(Job)),
+ IntervalUSec = IntervalMSec * 1000,
+ bounded(TimeRunning, 0, IntervalUSec).
+
+
+last_started(#job{} = Job) ->
+ case lists:keyfind(started, 1, Job#job.history) of
+ false -> {0, 0, 0}; % In case user set too low of a max history
+ {started, When} -> When
+ end.
+
+
+bounded(Val, Min, Max) ->
+ max(Min, min(Max, Val)).
+
+
+% Config helper functions
+
+get_config_shares() ->
+ lists:map(fun({K, V}) ->
+ {list_to_binary(K), int_val(V, ?DEFAULT_SHARES)}
+ end, config:get("replicator.shares")).
+
+
+priority_coeff() ->
+ % This is the K2 coefficient from [1]
+ Default = ?DEFAULT_PRIORITY_COEFF,
+ Val = float_val(config:get("replicator", "priority_coeff"), Default),
+ bounded(Val, 0.0, 1.0).
+
+
+usage_coeff() ->
+ % This is the K1 coefficient from [1]
+ Default = ?DEFAULT_USAGE_COEFF,
+ Val = float_val(config:get("replicator", "usage_coeff"), Default),
+ bounded(Val, 0.0, 1.0).
+
+
+int_val(Str, Default) when is_list(Str) ->
+ try list_to_integer(Str) of
+ Val -> Val
+ catch
+ error:badarg ->
+ Default
+ end.
+
+
+float_val(undefined, Default) ->
+ Default;
+
+float_val(Str, Default) when is_list(Str) ->
+ try list_to_float(Str) of
+ Val -> Val
+ catch
+ error:badarg ->
+ Default
+ end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch_replicator/test/eunit/couch_replicator_test.hrl").
+
+
+-define(DB1, <<"db1">>).
+-define(DB2, <<"db2">>).
+-define(DB3, <<"db3">>).
+-define(J1, <<"j1">>).
+-define(J2, <<"j2">>).
+-define(J3, <<"j3">>).
+
+
+fair_share_test_() ->
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(init_works),
+ ?TDEF_FE(shares_are_updated_and_reset),
+ ?TDEF_FE(jobs_are_added_and_removed),
+ ?TDEF_FE(can_fetch_job_priority),
+ ?TDEF_FE(jobs_are_charged),
+ ?TDEF_FE(usage_is_updated),
+ ?TDEF_FE(priority_coefficient_works),
+ ?TDEF_FE(priority_decays_when_jobs_stop_running),
+ ?TDEF_FE(priority_increases_when_jobs_run),
+ ?TDEF_FE(two_dbs_equal_shares_equal_number_of_jobs),
+ ?TDEF_FE(two_dbs_unequal_shares_equal_number_of_jobs),
+ ?TDEF_FE(two_dbs_equal_shares_unequal_number_of_jobs),
+ ?TDEF_FE(two_dbs_unequal_shares_unequal_number_of_jobs),
+ ?TDEF_FE(three_dbs_equal_shares_equal_number_of_jobs),
+ ?TDEF_FE(three_dbs_unequal_shares_equal_number_of_jobs),
+ ?TDEF_FE(three_dbs_equal_shares_unequal_number_of_jobs),
+ ?TDEF_FE(three_dbs_unequal_shares_unequal_number_of_jobs)
+ ]
+ }
+ }.
+
+
+setup_all() ->
+ test_util:start_couch().
+
+
+teardown_all(Ctx) ->
+ config_delete("priority_coeff"),
+ config_delete("usage_coeff"),
+ config_shares_delete(),
+ test_util:stop_couch(Ctx).
+
+
+setup() ->
+ init(),
+ ok.
+
+
+teardown(_) ->
+ clear(),
+ config_delete("priority_coeff"),
+ config_delete("usage_coeff"),
+ config_shares_delete().
+
+
+init_works(_)->
+ Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+ [?assert(is_list(ets:info(T))) || T <- Tables],
+ ?assertEqual(#{}, tab2map(?SHARES)),
+
+ clear(),
+ [?assertEqual(undefined, ets:info(T)) || T <- Tables],
+
+ config_share_set("db1", "200"),
+ init(),
+ ?assertEqual(200, shares(?DB1)),
+ ?assertEqual(#{?DB1 => 200}, tab2map(?SHARES)).
+
+
+shares_are_updated_and_reset(_) ->
+ ?assertEqual(#{}, tab2map(?SHARES)),
+
+ update_shares(?DB1, 42),
+ ?assertEqual(42, shares(?DB1)),
+
+ reset_shares(?DB1),
+ ?assertEqual(100, shares(?DB1)),
+ ?assertEqual(#{}, tab2map(?SHARES)),
+
+ % min shares
+ update_shares(?DB1, 0),
+ ?assertEqual(1, shares(?DB1)),
+
+ % max shares
+ update_shares(?DB1, 1001),
+ ?assertEqual(1000, shares(?DB1)).
+
+
+jobs_are_added_and_removed(_) ->
+ job_added(job(?J1, ?DB1)),
+ ?assertEqual(1, num_jobs(?DB1)),
+ ?assertEqual(#{?J1 => 0}, tab2map(?PRIORITIES)),
+
+ job_added(job(?J2, ?DB1)),
+ ?assertEqual(2, num_jobs(?DB1)),
+ ?assertEqual(#{?J1 => 0, ?J2 => 0}, tab2map(?PRIORITIES)),
+
+ job_added(job(?J3, ?DB2)),
+ ?assertEqual(1, num_jobs(?DB2)),
+ ?assertEqual(#{?J1 => 0, ?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)),
+
+ job_removed(job(?J1, ?DB1)),
+ ?assertEqual(1, num_jobs(?DB1)),
+ ?assertEqual(#{?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)),
+
+ job_removed(job(?J3, ?DB2)),
+ ?assertEqual(0, num_jobs(?DB2)),
+ ?assertEqual(0, priority(?J3)),
+
+ job_removed(job(?J2, ?DB1)),
+ ?assertEqual(0, num_jobs(?DB2)),
+ ?assertEqual(#{}, tab2map(?NUM_JOBS)),
+ ?assertEqual(0, priority(?J2)),
+ ?assertEqual(#{}, tab2map(?PRIORITIES)).
+
+
+can_fetch_job_priority(_) ->
+ job_added(job(?J1, ?DB1)),
+ ?assertEqual(0, priority(?J1)),
+
+ ets:insert(?PRIORITIES, {?J1, 42}),
+ ?assertEqual(42, priority(?J1)),
+
+ ets:delete(?PRIORITIES, ?J1),
+ ?assertEqual(0, priority(?J1)).
+
+
+jobs_are_charged(_) ->
+ Job1 = running_job(?J1, ?DB1),
+ job_added(Job1),
+ ?assertEqual(#{}, tab2map(?CHARGES)),
+
+ charge(Job1, 1000, {0, 1, 0}),
+ ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)),
+
+ % Stopped jobs are not charged
+ charge(stop(Job1), 1000, {0, 1, 0}),
+ ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)),
+
+ % Only charge up to one interval's worth even if job ran longer
+ charge(Job1, 1000, {0, 5, 0}),
+ ?assertEqual(#{?DB1 => 2000000}, tab2map(?CHARGES)),
+
+ % Charges are accumulated from jobs in same db
+ Job2 = running_job(?J2, ?DB1),
+ job_added(Job2),
+ charge(Job2, 1000, {0, 0, 1}),
+ ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)),
+
+ % Charges are not cleared if jobs are removed
+ job_removed(Job1),
+ job_removed(Job2),
+ ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)).
+
+
+usage_is_updated(_) ->
+ Job = running_job(?J1, ?DB1),
+ job_added(Job),
+
+ charge(Job, 60000, {0, 60, 0}),
+ update_usage(),
+ ?assertEqual(60000000, usage(?DB1)),
+
+ % Charges table is cleared after usage is updated
+ ?assertEqual(#{}, tab2map(?CHARGES)),
+
+ % Check that usage decay works
+ config_set("usage_coeff", "0.2"),
+ update_usage(),
+ ?assertEqual(12000000, usage(?DB1)),
+
+ config_set("usage_coeff", "0.5"),
+ update_usage(),
+ ?assertEqual(6000000, usage(?DB1)),
+
+ % Check that function both decays and updates from charges
+ charge(Job, 60000, {0, 60, 0}),
+ update_usage(),
+ ?assertEqual(63000000, usage(?DB1)),
+
+ % Usage eventually decays to 0 and is removed from the table
+ [update_usage() || _ <- lists:seq(1, 100)],
+ ?assertEqual(0, usage(?DB1)),
+ ?assertEqual(#{}, tab2map(?USAGE)).
+
+
+priority_coefficient_works(_) ->
+ job_added(job(?J1, ?DB1)),
+ ets:insert(?PRIORITIES, {?J1, 1000}),
+
+ config_set("priority_coeff", "0.8"),
+ decay_priorities(),
+ ?assertEqual(800, priority(?J1)),
+
+ config_set("priority_coeff", "0.5"),
+ decay_priorities(),
+ ?assertEqual(400, priority(?J1)),
+
+ % If non-float junk value is set then the default is used
+ config_set("priority_coeff", "junk"),
+ decay_priorities(),
+ ?assertEqual(392, priority(?J1)),
+
+ % Clipped to 1.0 max
+ config_set("priority_coeff", "1.1"),
+ decay_priorities(),
+ ?assertEqual(392, priority(?J1)),
+
+ % Clipped to 0.0 min and removed when =< 0
+ config_set("priority_coeff", "-0.1"),
+ decay_priorities(),
+ ?assertEqual(0, priority(?J1)),
+ ?assertEqual(#{}, tab2map(?PRIORITIES)).
+
+
+priority_decays_when_jobs_stop_running(_) ->
+ Job = running_job(?J1, ?DB1),
+ job_added(Job),
+
+ % Ran for one cycle then stop
+ {[], Pending} = reschedule(1, {[Job], []}),
+
+ % Priority is non-0 initially
+ ?assert(priority(?J1) > 0),
+
+ % Priority decays to 0 after some cycles
+ [reschedule(0, {[], Pending}) || _ <- lists:seq(1, 500)],
+ ?assertEqual(0, priority(?J1)).
+
+
+priority_increases_when_jobs_run(_) ->
+ Job = running_job(?J1, ?DB1),
+ job_added(Job),
+
+ Running = [Job],
+ reschedule(0, {Running, []}),
+ P1 = priority(?J1),
+ ?assert(P1 > 0),
+
+ % Priority increases
+ reschedule(0, {Running, []}),
+ P2 = priority(?J1),
+ ?assert(P2 > P1),
+
+ % Additive priority increase is balanced out by priority decay
+ [reschedule(0, {Running, []}) || _ <- lists:seq(1, 500)],
+ Pn = priority(?J1),
+ ?assert(Pn > P2),
+
+ reschedule(0, {Running, []}),
+ Pm = priority(?J1),
+ ?assertEqual(Pn, Pm).
+
+
+two_dbs_equal_shares_equal_number_of_jobs(_) ->
+ update_shares(?DB1, 100),
+ update_shares(?DB2, 100),
+ Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}),
+ #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
+ ?assert(49 =< Db1 andalso Db1 =< 51),
+ ?assert(49 =< Db2 andalso Db2 =< 51).
+
+
+two_dbs_unequal_shares_equal_number_of_jobs(_) ->
+ update_shares(?DB1, 100),
+ update_shares(?DB1, 900),
+ Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}),
+ #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
+ ?assert(89 =< Db1 andalso Db1 =< 91),
+ ?assert(9 =< Db2 andalso Db2 =< 11).
+
+
+two_dbs_equal_shares_unequal_number_of_jobs(_) ->
+ update_shares(?DB1, 100),
+ update_shares(?DB2, 100),
+ Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}),
+ #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
+ ?assert(49 =< Db1 andalso Db1 =< 51),
+ ?assert(49 =< Db2 andalso Db2 =< 51).
+
+
+two_dbs_unequal_shares_unequal_number_of_jobs(_) ->
+ update_shares(?DB1, 1),
+ update_shares(?DB2, 100),
+ Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}),
+ #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
+ ?assert(0 =< Db1 andalso Db1 =< 2),
+ ?assert(98 =< Db2 andalso Db2 =< 100).
+
+
+three_dbs_equal_shares_equal_number_of_jobs(_) ->
+ update_shares(?DB1, 100),
+ update_shares(?DB2, 100),
+ update_shares(?DB3, 100),
+ Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}),
+ #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
+ ?assert(32 =< Db1 andalso Db1 =< 34),
+ ?assert(32 =< Db2 andalso Db2 =< 34),
+ ?assert(32 =< Db3 andalso Db3 =< 34).
+
+
+three_dbs_unequal_shares_equal_number_of_jobs(_) ->
+ update_shares(?DB1, 100),
+ update_shares(?DB2, 700),
+ update_shares(?DB3, 200),
+ Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}),
+ #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
+ ?assert(9 =< Db1 andalso Db1 =< 11),
+ ?assert(69 =< Db2 andalso Db2 =< 71),
+ ?assert(19 =< Db3 andalso Db3 =< 21).
+
+
+three_dbs_equal_shares_unequal_number_of_jobs(_) ->
+ update_shares(?DB1, 100),
+ update_shares(?DB2, 100),
+ update_shares(?DB3, 100),
+ Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 100}, ?DB3 => {25, 75}}),
+ #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
+ ?assert(32 =< Db1 andalso Db1 =< 34),
+ ?assert(32 =< Db2 andalso Db2 =< 34),
+ ?assert(32 =< Db3 andalso Db3 =< 34).
+
+
+three_dbs_unequal_shares_unequal_number_of_jobs(_) ->
+ update_shares(?DB1, 1000),
+ update_shares(?DB2, 100),
+ update_shares(?DB3, 1),
+ Jobs = jobs(#{?DB1 => {25, 100}, ?DB2 => {25, 125}, ?DB3 => {25, 875}}),
+ #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
+ ?assert(87 =< Db1 andalso Db1 =< 89),
+ ?assert(9 =< Db2 andalso Db2 =< 11),
+ ?assert(2 =< Db3 andalso Db3 =< 4).
+
+
+config_set(K, V) ->
+ config:set("replicator", K, V, _Persist = false).
+
+
+config_delete(K) ->
+ config:delete("replicator", K, _Persist = false).
+
+
+config_share_set(K, V) ->
+ config:set("replicator.shares", K, V, _Persist = false).
+
+
+config_shares_delete() ->
+ [config:delete("replicator.shares", K, _Persist = false) ||
+ {K, _} <- config:get("replicator.shares")].
+
+
+tab2map(T) when is_atom(T) ->
+ maps:from_list(ets:tab2list(T)).
+
+
+job(rand, Db) ->
+ job(rand:uniform(1 bsl 59), Db);
+
+job(Id, Db) ->
+ Job = #job{
+ id = Id,
+ rep = #rep{
+ db_name = Db,
+ user_ctx = #user_ctx{}
+ }
+ },
+ stop(Job).
+
+
+running_job(Id, Db) ->
+ run(job(Id, Db)).
+
+
+run(#job{} = Job) ->
+ Job#job{
+ pid = list_to_pid("<0.9999.999>"),
+ history = [{started, {0, 0, 0}}, {added, {0, 0, 0}}]
+ }.
+
+
+stop(#job{} = Job) ->
+ Job#job{
+ pid = undefined,
+ history = [{added, {0, 0, 0}}]
+ }.
+
+
+% Simple scheduler simulator. Start and stop N jobs and do the
+% accounting steps. Return a new list of running and pending jobs. If
+% N is 0 then jobs which were running stay running and jobs were
+% pending stay pending.
+%
+reschedule(N, {Running, Pending}) ->
+ update(Running, 60000, {0, 60, 0}),
+
+ RunPr = [{priority(Job#job.id), Job} || Job <- Running],
+ StopPr = [{priority(Job#job.id), Job} || Job <- Pending],
+
+ {_, Running1} = lists:unzip(lists:reverse(lists:sort(RunPr))),
+ {_, Pending1} = lists:unzip(lists:sort(StopPr)),
+
+ ToStop = lists:sublist(Running1, N),
+ ToStart = lists:sublist(Pending1, N),
+
+ Running2 = [run(Job) || Job <- ToStart] ++ Running1 -- ToStop,
+ Pending2 = [stop(Job) || Job <- ToStop] ++ Pending1 -- ToStart,
+
+ {Running2, Pending2}.
+
+
+% Run a few scheduling cycles and calculate usage percentage for each db
+%
+run_scheduler(Cycles, Churn, Jobs0) ->
+ Acc0 = {#{}, Jobs0},
+
+ {Sum, _} = lists:foldl(fun(_CycleCnt, {UsageAcc, {Running, _} = Jobs}) ->
+ UsageAcc1 = lists:foldl(fun(#job{} = Job, Acc) ->
+ Db = Job#job.rep#rep.db_name,
+ maps:update_with(Db, fun(V) -> V + 1 end, 0, Acc)
+ end, UsageAcc, Running),
+ {UsageAcc1, reschedule(Churn, Jobs)}
+ end, Acc0, lists:seq(1, Cycles)),
+
+ Total = maps:fold(fun(_, V, Acc) -> Acc + V end, 0, Sum),
+ maps:map(fun(_Db, V) -> round(V / Total * 100) end, Sum).
+
+
+% Dbs = #{Db => {RunningCount, PendingCount}
+%
+jobs(#{} = Dbs) ->
+ maps:fold(fun(Db, {RCnt, PCnt}, {Running, Pending}) ->
+ RJobs = [running_job(rand, Db) || _ <- lists:seq(1, RCnt)],
+ PJobs = [job(rand, Db) || _ <- lists:seq(1, PCnt)],
+ [job_added(Job) || Job <- RJobs ++ PJobs],
+ {Running ++ RJobs, Pending ++ PJobs}
+ end, {[], []}, Dbs).
+
+
+-endif.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test.hrl b/src/couch_replicator/test/eunit/couch_replicator_test.hrl
new file mode 100644
index 000000000..6db97ec2b
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_test.hrl
@@ -0,0 +1,35 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% Borrowed from fabric2_test.hrl
+
+% Some test modules do not use with, so squash the unused fun compiler warning
+-compile([{nowarn_unused_function, [{with, 1}]}]).
+
+
+-define(TDEF(Name), {atom_to_list(Name), fun Name/1}).
+-define(TDEF(Name, Timeout), {atom_to_list(Name), Timeout, fun Name/1}).
+
+-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end).
+-define(TDEF_FE(Name, Timeout), fun(Arg) -> {atom_to_list(Name), {timeout, Timeout, ?_test(Name(Arg))}} end).
+
+
+with(Tests) ->
+ fun(ArgsTuple) ->
+ lists:map(fun
+ ({Name, Fun}) ->
+ {Name, ?_test(Fun(ArgsTuple))};
+ ({Name, Timeout, Fun}) ->
+ {Name, {timeout, Timeout, ?_test(Fun(ArgsTuple))}}
+ end, Tests)
+ end.