diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2021-02-15 22:35:32 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2021-03-01 14:12:19 -0500 |
commit | 886222dd998fa3de42887bd2e990b869ecc97632 (patch) | |
tree | 0482c9d6a4b99c85a316e69104f3eafcf00706d3 | |
parent | dae6e130d0abfd83f4aa2245ef7f99b487047e0b (diff) | |
download | couchdb-886222dd998fa3de42887bd2e990b869ecc97632.tar.gz |
Fair Share Replication Scheduler Implementation
Fair share replication scheduler allows configuring job priorities
per-replicator db.
Previously jobs from all the replication dbs would be added to the scheduler
and run in a round-robin order. This update makes it possible to specify the
relative priority of jobs from different databases. For example, there could be
low, high and default priority _replicator dbs.
The original algorithm comes from the [A Fair Share
Scheduler](https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
"Fair Share Scheduler") paper by Judy Kay and Piers Lauder. A summary of how
the algorithm works is included in the top level comment in the
couch_replicator_share module.
There is minimal modification to the main scheduler logic. Besides the
share accounting logic each cycle, the other changes are:
* Running and stopping candidates are now picked based on the priority first,
and then on their last_started timestamp.
* When jobs finish executing mid-cycle, their charges are accounted for. That
holds for jobs which terminate normally, are removed by the user, or crash.
Other interesting aspects are the interaction with the error back-off mechanism
and how one-shot replications are treated:
* The exponential error back-off mechanism is unaltered and takes precedence
over the priority values. That means unhealthy jobs are rejected and
"penalized" before the priority value is even looked at.
* One-shot replications, once started, are not stopped during each scheduling
cycle unless the operator manually adjusts the `max_jobs` parameter. That
behavior is necessary to preserve the "snapshot" semantics and is retained in
this update.
-rw-r--r-- | rel/overlay/etc/default.ini | 26 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler.erl | 214 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_share.erl | 808 | ||||
-rw-r--r-- | src/couch_replicator/test/eunit/couch_replicator_test.hrl | 35 |
4 files changed, 1006 insertions, 77 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 973b1a1fe..6c992d7bd 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -482,6 +482,32 @@ ssl_certificate_max_depth = 3 ; or 403 response this setting is not needed. ;session_refresh_interval_sec = 550 +; Usage coefficient decays historic fair share usage every scheduling +; cycle. The value must be between 0.0 and 1.0. Lower values will +; ensure historic usage decays quicker and higher values means it will +; be remembered longer. +;usage_coeff = 0.5 + +; Priority coefficient decays all the job priorities such that they slowly +; drift towards the front of the run queue. This coefficient defines a maximum +; time window over which this algorithm would operate. For example, if this +; value is too small (0.1), after a few cycles quite a few jobs would end up at +; priority 0, and would render this algorithm useless. The default value of +; 0.98 is picked such that if a job ran for one scheduler cycle, then didn't +; get to run for 7 hours, it would still have priority > 0. 7 hours was picked +; as it was close enought to 8 hours which is the default maximum error backoff +; interval. +;priority_coeff = 0.98 + + +[replicator.shares] +; Fair share configuration section. More shares result in a higher +; chance that jobs from that db get to run. The default value is 100, +; minimum is 1 and maximum is 1000. The configuration may be set even +; if the database does not exit. +;_replicator = 100 + + [log] ; Possible log levels: ; debug diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index 641443a7c..223cbb834 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -55,15 +55,10 @@ stats_updater_loop/1 ]). --include("couch_replicator_scheduler.hrl"). -include("couch_replicator.hrl"). -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include_lib("couch/include/couch_db.hrl"). -%% types --type event_type() :: added | started | stopped | {crashed, any()}. --type event() :: {Type:: event_type(), When :: erlang:timestamp()}. --type history() :: nonempty_list(event()). %% definitions -define(MAX_BACKOFF_EXPONENT, 10). @@ -78,13 +73,13 @@ -define(DEFAULT_SCHEDULER_INTERVAL, 60000). --record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}). --record(job, { - id :: job_id() | '$1' | '_', - rep :: #rep{} | '_', - pid :: undefined | pid() | '$1' | '_', - monitor :: undefined | reference() | '_', - history :: history() | '_' +-record(state, { + interval = ?DEFAULT_SCHEDULER_INTERVAL, + timer, + max_jobs, + max_churn, + max_history, + stats_pid }). -record(stats_acc, { @@ -229,6 +224,7 @@ init(_) -> EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}, {write_concurrency, true}], ?MODULE = ets:new(?MODULE, EtsOpts), + ok = couch_replicator_share:init(), ok = config:listen_for_changes(?MODULE, nil), Interval = config:get_integer("replicator", "interval", ?DEFAULT_SCHEDULER_INTERVAL), @@ -290,6 +286,17 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval), couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), {noreply, State#state{interval = Interval}}; +handle_cast({update_shares, Key, Shares}, State) when is_binary(Key), + is_integer(Shares), Shares >= 0 -> + couch_log:notice("~p: shares for ~s set to ~B", [?MODULE, Key, Shares]), + couch_replicator_share:update_shares(Key, Shares), + {noreply, State}; + +handle_cast({reset_shares, Key}, State) when is_binary(Key) -> + couch_log:notice("~p: shares for ~s reset to default", [?MODULE, Key]), + couch_replicator_share:reset_shares(Key), + {noreply, State}; + handle_cast({update_job_stats, JobId, Stats}, State) -> case rep_state(JobId) of nil -> @@ -314,6 +321,8 @@ handle_info(reschedule, State) -> handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> {ok, Job} = job_by_pid(Pid), couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]), + Interval = State#state.interval, + couch_replicator_share:charge(Job, Interval, os:timestamp()), remove_job_int(Job), update_running_jobs_stats(State#state.stats_pid), {noreply, State}; @@ -324,6 +333,8 @@ handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) -> {shutdown, ShutdownReason} -> ShutdownReason; Other -> Other end, + Interval = State#state.interval, + couch_replicator_share:charge(Job, Interval, os:timestamp()), ok = handle_crashed_job(Job, Reason, State), {noreply, State}; @@ -340,6 +351,7 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, _State) -> + couch_replicator_share:clear(), ok. @@ -369,6 +381,15 @@ handle_config_change("replicator", "max_history", V, _, S) -> ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}), {ok, S}; +handle_config_change("replicator.shares", Key, deleted, _, S) -> + ok = gen_server:cast(?MODULE, {reset_shares, list_to_binary(Key)}), + {ok, S}; + +handle_config_change("replicator.shares", Key, V, _, S) -> + ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key), + list_to_integer(V)}), + {ok, S}; + handle_config_change(_, _, _, _, S) -> {ok, S}. @@ -449,19 +470,19 @@ pending_jobs(0) -> []; pending_jobs(Count) when is_integer(Count), Count > 0 -> - Set0 = gb_sets:new(), % [{LastStart, Job},...] + Set0 = gb_sets:new(), % [{{Priority, LastStart}, Job},...] Now = os:timestamp(), Acc0 = {Set0, Now, Count, health_threshold()}, {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE), - [Job || {_Started, Job} <- gb_sets:to_list(Set1)]. + [Job || {_PriorityKey, Job} <- gb_sets:to_list(Set1)]. pending_fold(#job{pid = Pid}, Acc) when is_pid(Pid) -> Acc; pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> - Set1 = case {not_recently_crashed(Job, Now, HealthThreshold), - gb_sets:size(Set) >= Count} of + Healthy = not_recently_crashed(Job, Now, HealthThreshold), + Set1 = case {Healthy, gb_sets:size(Set) >= Count} of {true, true} -> % Job is healthy but already reached accumulated limit, so might % have to replace one of the accumulated jobs @@ -469,7 +490,7 @@ pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> {true, false} -> % Job is healthy and we haven't reached the limit, so add job % to accumulator - gb_sets:add_element({last_started(Job), Job}, Set); + gb_sets:add_element({start_priority_key(Job), Job}, Set); {false, _} -> % This job is not healthy (has crashed too recently), so skip it. Set @@ -477,24 +498,34 @@ pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> {Set1, Now, Count, HealthThreshold}. -% Replace Job in the accumulator if it is older than youngest job there. -% "oldest" here means one which has been waiting to run the longest. "youngest" -% means the one with most recent activity. The goal is to keep up to Count -% oldest jobs during iteration. For example if there are jobs with these times -% accumulated so far [5, 7, 11], and start time of current job is 6. Then -% 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in -% [5, 6, 7]. In the end the result might look like [1, 2, 5], for example. +% Replace Job in the accumulator if it has a higher priority (lower priority +% value) than the lowest priority there. Job priority is indexed by +% {FairSharePiority, LastStarted} tuples. If the FairSharePriority is the same +% then last started timestamp is used to pick. The goal is to keep up to Count +% oldest jobs during the iteration. For example, if there are jobs with these +% priorities accumulated so far [5, 7, 11], and the priority of current job is +% 6. Then 6 < 11 is true, so 11 (lower priority) is dropped and 6 is inserted +% resulting in [5, 6, 7]. In the end the result might look like [1, 2, 5], for +% example. +% pending_maybe_replace(Job, Set) -> - Started = last_started(Job), - {Youngest, YoungestJob} = gb_sets:largest(Set), - case Started < Youngest of + Key = start_priority_key(Job), + {LowestPKey, LowestPJob} = gb_sets:largest(Set), + case Key < LowestPKey of true -> - Set1 = gb_sets:delete({Youngest, YoungestJob}, Set), - gb_sets:add_element({Started, Job}, Set1); + Set1 = gb_sets:delete({LowestPKey, LowestPJob}, Set), + gb_sets:add_element({Key, Job}, Set1); false -> Set end. +% Starting priority key is used to order pending jobs such that the ones with a +% lower priority value and start time would sort first, so they would be the +% first to run. +% +start_priority_key(#job{} = Job) -> + {couch_replicator_share:priority(Job#job.id), last_started(Job)}. + start_jobs(Count, State) -> [start_job_int(Job, State) || Job <- pending_jobs(Count)], @@ -509,13 +540,18 @@ stop_jobs(Count, IsContinuous, State) when is_integer(Count) -> Running0 = running_jobs(), ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end, Running1 = lists:filter(ContinuousPred, Running0), - Running2 = lists:sort(fun longest_running/2, Running1), - Running3 = lists:sublist(Running2, Count), - length([stop_job_int(Job, State) || Job <- Running3]). + Running2 = [{stop_priority_key(Job), Job} || Job <- Running1], + Running3 = lists:sublist(lists:sort(Running2), Count), + length([stop_job_int(Job, State) || {_SortKey, Job} <- Running3]). -longest_running(#job{} = A, #job{} = B) -> - last_started(A) =< last_started(B). +% Lower priority jobs have higher priority values, so we negate them, that way +% when sorted, they'll come up first. If priorities are equal, jobs are sorted +% by the lowest starting times as jobs with lowest start time have been running +% the longest. +% +stop_priority_key(#job{} = Job) -> + {-couch_replicator_share:priority(Job#job.id), last_started(Job)}. not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> @@ -593,6 +629,7 @@ backoff_micros(CrashCount) -> -spec add_job_int(#job{}) -> boolean(). add_job_int(#job{} = Job) -> + couch_replicator_share:job_added(Job), ets:insert_new(?MODULE, Job). @@ -600,6 +637,9 @@ add_job_int(#job{} = Job) -> maybe_remove_job_int(JobId, State) -> case job_by_id(JobId) of {ok, Job} -> + Now = os:timestamp(), + Interval = State#state.interval, + couch_replicator_share:charge(Job, Interval, Now), ok = stop_job_int(Job, State), true = remove_job_int(Job), couch_stats:increment_counter([couch_replicator, jobs, removes]), @@ -657,6 +697,7 @@ stop_job_int(#job{} = Job, State) -> -spec remove_job_int(#job{}) -> true. remove_job_int(#job{} = Job) -> + couch_replicator_share:job_removed(Job), ets:delete(?MODULE, Job#job.id). @@ -733,7 +774,8 @@ reset_job_process(#job{} = Job) -> -spec reschedule(#state{}) -> ok. -reschedule(State) -> +reschedule(#state{interval = Interval} = State) -> + couch_replicator_share:update(running_jobs(), Interval, os:timestamp()), StopCount = stop_excess_jobs(State, running_job_count()), rotate_jobs(State, StopCount), update_running_jobs_stats(State#state.stats_pid). @@ -1035,7 +1077,8 @@ longest_running_test() -> J0 = testjob([crashed()]), J1 = testjob([started(1)]), J2 = testjob([started(2)]), - Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end, + SortFun = fun(A, B) -> last_started(A) =< last_started(B) end, + Sort = fun(Jobs) -> lists:sort(SortFun, Jobs) end, ?assertEqual([], Sort([])), ?assertEqual([J1], Sort([J1])), ?assertEqual([J1, J2], Sort([J2, J1])), @@ -1381,11 +1424,12 @@ t_if_excess_is_trimmed_rotation_still_happens() -> t_if_transient_job_crashes_it_gets_removed() -> ?_test(begin Pid = mock_pid(), + Rep = continuous_rep(), Job = #job{ id = job1, pid = Pid, history = [added()], - rep = #rep{db_name = null, options = [{continuous, true}]} + rep = Rep#rep{db_name = null} }, setup_jobs([Job]), ?assertEqual(1, ets:info(?MODULE, size)), @@ -1399,15 +1443,20 @@ t_if_transient_job_crashes_it_gets_removed() -> t_if_permanent_job_crashes_it_stays_in_ets() -> ?_test(begin Pid = mock_pid(), + Rep = continuous_rep(), Job = #job{ id = job1, pid = Pid, history = [added()], - rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]} + rep = Rep#rep{db_name = <<"db1">>} }, setup_jobs([Job]), ?assertEqual(1, ets:info(?MODULE, size)), - State = #state{max_jobs =1, max_history = 3, stats_pid = self()}, + State = #state{ + max_jobs = 1, + max_history = 3, + stats_pid = self() + }, {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, State), ?assertEqual(1, ets:info(?MODULE, size)), @@ -1419,21 +1468,11 @@ t_if_permanent_job_crashes_it_stays_in_ets() -> t_existing_jobs() -> ?_test(begin - Rep = #rep{ - id = job1, - db_name = <<"db">>, - source = <<"s">>, - target = <<"t">>, - options = [{continuous, true}] - }, + Rep0 = continuous_rep(<<"s">>, <<"t">>), + Rep = Rep0#rep{id = job1, db_name = <<"db">>}, setup_jobs([#job{id = Rep#rep.id, rep = Rep}]), - NewRep = #rep{ - id = Rep#rep.id, - db_name = <<"db">>, - source = <<"s">>, - target = <<"t">>, - options = [{continuous, true}] - }, + NewRep0 = continuous_rep(<<"s">>, <<"t">>), + NewRep = NewRep0#rep{id = Rep#rep.id, db_name = <<"db">>}, ?assert(existing_replication(NewRep)), ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})), ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})), @@ -1443,15 +1482,12 @@ t_existing_jobs() -> t_job_summary_running() -> ?_test(begin + Rep = rep(<<"s">>, <<"t">>), Job = #job{ id = job1, pid = mock_pid(), history = [added()], - rep = #rep{ - db_name = <<"db1">>, - source = <<"s">>, - target = <<"t">> - } + rep = Rep#rep{db_name = <<"db1">>} }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1472,7 +1508,7 @@ t_job_summary_pending() -> id = job1, pid = undefined, history = [stopped(20), started(10), added()], - rep = #rep{source = <<"s">>, target = <<"t">>} + rep = rep(<<"s">>, <<"t">>) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1492,7 +1528,7 @@ t_job_summary_crashing_once() -> Job = #job{ id = job1, history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)], - rep = #rep{source = <<"s">>, target = <<"t">>} + rep = rep(<<"s">>, <<"t">>) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1508,7 +1544,7 @@ t_job_summary_crashing_many_times() -> Job = #job{ id = job1, history = [crashed(4), started(3), crashed(2), started(1)], - rep = #rep{source = <<"s">>, target = <<"t">>} + rep = rep(<<"s">>, <<"t">>) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1521,19 +1557,18 @@ t_job_summary_crashing_many_times() -> t_job_summary_proxy_fields() -> ?_test(begin + Src = #httpdb{ + url = "https://s", + proxy_url = "http://u:p@sproxy:12" + }, + Tgt = #httpdb{ + url = "http://t", + proxy_url = "socks5://u:p@tproxy:34" + }, Job = #job{ id = job1, history = [started(10), added()], - rep = #rep{ - source = #httpdb{ - url = "https://s", - proxy_url = "http://u:p@sproxy:12" - }, - target = #httpdb{ - url = "http://t", - proxy_url = "socks5://u:p@tproxy:34" - } - } + rep = rep(Src, Tgt) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1548,6 +1583,8 @@ t_job_summary_proxy_fields() -> setup_all() -> catch ets:delete(?MODULE), + meck:expect(config, get, 1, []), + meck:expect(config, get, 2, undefined), meck:expect(couch_log, notice, 2, ok), meck:expect(couch_log, warning, 2, ok), meck:expect(couch_log, error, 2, ok), @@ -1555,10 +1592,13 @@ setup_all() -> meck:expect(couch_stats, increment_counter, 1, ok), meck:expect(couch_stats, update_gauge, 2, ok), Pid = mock_pid(), - meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}). + meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}), + couch_replicator_share:init(). + teardown_all(_) -> + couch_replicator_share:clear(), catch ets:delete(?MODULE), meck:unload(). @@ -1567,7 +1607,8 @@ setup() -> meck:reset([ couch_log, couch_replicator_scheduler_sup, - couch_stats + couch_stats, + config ]). @@ -1621,13 +1662,32 @@ mock_state(MaxJobs, MaxChurn) -> }. +rep() -> + #rep{options = [], user_ctx = #user_ctx{}}. + + +rep(Src, Tgt) -> + Rep = rep(), + Rep#rep{source = Src, target = Tgt}. + + +continuous_rep() -> + #rep{options = [{continuous, true}], user_ctx = #user_ctx{}}. + + +continuous_rep(Src, Tgt) -> + Rep = continuous_rep(), + Rep#rep{source = Src, target = Tgt}. + + + continuous(Id) when is_integer(Id) -> Started = Id, Hist = [stopped(Started+1), started(Started), added()], #job{ id = Id, history = Hist, - rep = #rep{options = [{continuous, true}]} + rep = continuous_rep() }. @@ -1637,7 +1697,7 @@ continuous_running(Id) when is_integer(Id) -> #job{ id = Id, history = [started(Started), added()], - rep = #rep{options = [{continuous, true}]}, + rep = continuous_rep(), pid = Pid, monitor = monitor(process, Pid) }. @@ -1646,7 +1706,7 @@ continuous_running(Id) when is_integer(Id) -> oneshot(Id) when is_integer(Id) -> Started = Id, Hist = [stopped(Started + 1), started(Started), added()], - #job{id = Id, history = Hist, rep = #rep{options = []}}. + #job{id = Id, history = Hist, rep = rep()}. oneshot_running(Id) when is_integer(Id) -> @@ -1655,7 +1715,7 @@ oneshot_running(Id) when is_integer(Id) -> #job{ id = Id, history = [started(Started), added()], - rep = #rep{options = []}, + rep = rep(), pid = Pid, monitor = monitor(process, Pid) }. diff --git a/src/couch_replicator/src/couch_replicator_share.erl b/src/couch_replicator/src/couch_replicator_share.erl new file mode 100644 index 000000000..1f5d00cf3 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_share.erl @@ -0,0 +1,808 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% This module implements the "Fair Share" algorithm by Judy Kay and Piers +% Lauder [1] and applies it to the scheduling of replication jobs. +% +% The main idea is _replicator dbs can have a configurable number of "shares" +% assigned to them. Shares is an abstract quantity from 1 to 1000. The default +% is 100. Jobs from _replicator databases with more shares get proportionally a +% higher chance to run than those from databases with a lower number of shares. +% +% Every scheduler cycle running jobs are "charged" based on how much time they +% spent running during that cycle. At the end of the cycle the accumulated +% charges for each job, the number of shares configured, and the total number +% of jobs in the pending queue from the same _replicator db, are used to +% calculate new priority values for all the jobs. To match the algorithm from +% the paper, jobs with lower priority values are the ones at the front of the +% run queue and have a higher chance of running. +% +% Here is how charges, shares, and number of sibling jobs affect the +% priority value: +% +% 1) Jobs from dbs with higher configured shares get assigned lower +% priority values and so stay closer to the front of the queue. +% +% 2) Jobs from dbs with many other jobs (many siblings) get assigned a +% higher priority value, so they get pushed further down the queue +% and have a lower chance of running. +% +% 3) Jobs which run longer accumulate more charges and get assigned a +% higher priority value and get to wait longer to run. +% +% In order to prevent job starvation, all job priorities are periodicaly +% decayed (decreased). This effectively moves all the jobs towards the front of +% the run queue. So, in effect, there are two competing processes: one +% uniformly moves all jobs to the front, and the other throws them back in +% proportion to those factors mentioned above. The speed of this uniform +% priority decay is controlled by the priority_coeff parameter. +% +% In order to prevent jobs from low shares dbs from "cheating" by getting +% deleted and immediately re-added, charges are accumulated using a +% historically decayed usage value. The speed of the usage decay is controlled +% by the `usage_coeff = 0.5` parameter. +% +% [1] : https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf + + +-module(couch_replicator_share). + +-export([ + init/0, + clear/0, + update_shares/2, + reset_shares/1, + job_added/1, + job_removed/1, + update/3, + priority/1, + charge/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("couch_replicator.hrl"). + + +% Usage coefficient decays historic usage every scheduling cycle. For example, +% the usage value for a job running 1 minute is 60000000 (i.e microseconds / +% minute), then if the job stops running it will take about 26 cycles (minutes) +% for it to decay to 0 and the system to "forget" about it completely: +% +% trunc(60000000 * math:pow(0.5, 26)) = 0 +% +-define(DEFAULT_USAGE_COEFF, 0.5). + +% Priority coefficient decays all the job priorities such that they slowly +% drift towards the front of the run queue. This coefficient defines a maximum +% time window over which this algorithm would operate. For example, if this +% value is too small (0.1), after a few cycles quite a few jobs would end up at +% priority 0, and would render this algorithm useless. The default value of +% 0.98 is picked such that if a job ran for one scheduler cycle, then didn't +% get to run for 7 hours, it would still have priority > 0. 7 hours was picked +% as it was close enought to 8 hours which is the default maximum error backoff +% interval. +% +% Example calculation: +% shares = 100 +% usage after 1 minute cycle run = 60000000 +% initial priority = 60000000 / (100 * 100) = 6000 +% trunc(6000 * math:pow(0.98, 431)) = 0 +% 431 / 60 ~= 7 hrs +% +-define(DEFAULT_PRIORITY_COEFF, 0.98). + + +-define(MIN_SHARES, 1). +-define(MAX_SHARES, 1000). +-define(DEFAULT_SHARES, 100). + +-define(SHARES, couch_replicator_shares). +-define(PRIORITIES, couch_replicator_priorities). +-define(USAGE, couch_replicator_usage). +-define(CHARGES, couch_replicator_stopped_usage). +-define(NUM_JOBS, couch_replicator_num_jobs). + + +init() -> + EtsOpts = [named_table, public], + ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares} + ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority} + ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage} + ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges} + ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs} + lists:foreach(fun({K, V}) -> update_shares(K, V) end, get_config_shares()). + + +clear() -> + Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS], + lists:foreach(fun(T) -> catch ets:delete(T) end, Tables). + + +% This should be called when user updates the replicator.shares config section +% +update_shares(Key, Shares) when is_integer(Shares) -> + ets:insert(?SHARES, {Key, bounded(Shares, ?MIN_SHARES, ?MAX_SHARES)}). + + +% Called when the config value is deleted and shares are reset to the default +% value. +reset_shares(Key) -> + ets:delete(?SHARES, Key). + + +job_added(#job{} = Job) -> + Key = key(Job), + % If the entry is not present {Key, 0} is used as the default + ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}), + % Update job's priority as if it ran during one scheduler cycle. This is so + % new jobs don't get to be at priority 0 (highest). + update_priority(Job). + + +job_removed(#job{} = Job) -> + Key = key(Job), + ets:delete(?PRIORITIES, Job#job.id), + case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of + N when is_integer(N), N =< 0 -> + ets:delete(?NUM_JOBS, Key); + N when is_integer(N), N > 0 -> + ok + end, + ok. + + +% This is the main algorithm update function. It should be called during each +% rescheduling cycle with a list of running jobs, the interval from the +% scheduler (in milliseconds), and the current timestamp. +% +% This function does all three main steps as described in [1]. +% +% 1. Update usage from all the charges in the last scheduling cycle +% +% 2. Uniformly decay all job priorities +% +% 3. Update priorities for all the running jobs based on usage and number of +% sibling jobs. +% +update(RunningJobs, Interval, {_, _, _} = Now) -> + lists:foreach(fun(Job) -> charge(Job, Interval, Now) end, RunningJobs), + update_usage(), + decay_priorities(), + lists:foreach(fun(Job) -> update_priority(Job) end, RunningJobs). + + +priority(JobId) -> + % Not found means it was removed because it's value was 0 + case ets:lookup(?PRIORITIES, JobId) of + [{_, Priority}] -> Priority; + [] -> 0 + end. + + +charge(#job{pid = undefined}, _, _) -> + 0; + +charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) -> + Key = key(Job), + Charges = job_charges(Job, Interval, Now), + % If the entry is not present {Key, 0} is used as the default + ets:update_counter(?CHARGES, Key, Charges, {Key, 0}). + + +usage(Key) -> + case ets:lookup(?USAGE, Key) of + [{_, Usage}] -> Usage; + [] -> 0 + end. + + +num_jobs(Key) -> + case ets:lookup(?NUM_JOBS, Key) of + [{_, NumJobs}] -> NumJobs; + [] -> 0 + end. + + +shares(Key) -> + case ets:lookup(?SHARES, Key) of + [{_, Shares}] -> Shares; + [] -> ?DEFAULT_SHARES + end. + + +% In [1] this described in the "Decay of Process Priorities" section +% +decay_priorities() -> + decay(?PRIORITIES, priority_coeff()), + % If priority becomes 0, it's removed. When looking it up, if it + % is missing we assume it is 0 + clear_zero(?PRIORITIES). + + +% This is the main part of the alrgorithm. In [1] it is described in the +% "Priority Adjustment" section. +% +update_priority(#job{} = Job) -> + Id = Job#job.id, + Key = key(Job), + Shares = shares(Key), + Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares), + % If the entry is not present {Id, 0} is used as the default + ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}). + + +% This is the "User-Level Scheduling" part from [1] +% +update_usage() -> + decay(?USAGE, usage_coeff()), + clear_zero(?USAGE), + ets:foldl(fun({Key, Charges}, _) -> + % If the entry is not present {Key, 0} is used as the default + ets:update_counter(?USAGE, Key, Charges, {Key, 0}) + end, 0, ?CHARGES), + % Start each interval with a fresh charges table + ets:delete_all_objects(?CHARGES). + + +% Private helper functions + +decay(Ets, Coeff) when is_atom(Ets) -> + % Use trunc to ensure the result stays an integer in order for + % ets:update_counter to work properly. It throws a badarg otherwise. + Head = {'$1', '$2'}, + Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}}, + ets:select_replace(Ets, [{Head, [], [Result]}]). + + +clear_zero(Ets) when is_atom(Ets) -> + % Coefficents bound to the [0.0, 1.0] range, and the `trunc` call in + % decay/2 ensures we can directly match on 0, as opposed needing to do =< 0 + % with a guard. + ets:select_delete(Ets, [{{'_', 0}, [], [true]}]). + + +key(#job{} = Job) -> + Rep = Job#job.rep, + case is_binary(Rep#rep.db_name) of + true -> mem3:dbname(Rep#rep.db_name); + false -> (Rep#rep.user_ctx)#user_ctx.name + end. + + +% Jobs are charged based on the amount of time the job was running during the +% last scheduling interval. The time units used are microseconds in order to +% have a large enough usage values so that when priority is calculated the +% rounded value won't be rounded off to 0 easily. The formula for the priority +% calculation is: +% +% Priority = (Usage * NumJobs) / Shares^2 +% +% Then in the worst case of a single job in the db, running only for one +% second,for one job, with 1000 (max) shares, the priority would be: +% +% 1000000 * 1 / (1000^2) = 1 +% +job_charges(#job{} = Job, IntervalMSec, {_, _, _} = Now) -> + TimeRunning = timer:now_diff(Now, last_started(Job)), + IntervalUSec = IntervalMSec * 1000, + bounded(TimeRunning, 0, IntervalUSec). + + +last_started(#job{} = Job) -> + case lists:keyfind(started, 1, Job#job.history) of + false -> {0, 0, 0}; % In case user set too low of a max history + {started, When} -> When + end. + + +bounded(Val, Min, Max) -> + max(Min, min(Max, Val)). + + +% Config helper functions + +get_config_shares() -> + lists:map(fun({K, V}) -> + {list_to_binary(K), int_val(V, ?DEFAULT_SHARES)} + end, config:get("replicator.shares")). + + +priority_coeff() -> + % This is the K2 coefficient from [1] + Default = ?DEFAULT_PRIORITY_COEFF, + Val = float_val(config:get("replicator", "priority_coeff"), Default), + bounded(Val, 0.0, 1.0). + + +usage_coeff() -> + % This is the K1 coefficient from [1] + Default = ?DEFAULT_USAGE_COEFF, + Val = float_val(config:get("replicator", "usage_coeff"), Default), + bounded(Val, 0.0, 1.0). + + +int_val(Str, Default) when is_list(Str) -> + try list_to_integer(Str) of + Val -> Val + catch + error:badarg -> + Default + end. + + +float_val(undefined, Default) -> + Default; + +float_val(Str, Default) when is_list(Str) -> + try list_to_float(Str) of + Val -> Val + catch + error:badarg -> + Default + end. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch_replicator/test/eunit/couch_replicator_test.hrl"). + + +-define(DB1, <<"db1">>). +-define(DB2, <<"db2">>). +-define(DB3, <<"db3">>). +-define(J1, <<"j1">>). +-define(J2, <<"j2">>). +-define(J3, <<"j3">>). + + +fair_share_test_() -> + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(init_works), + ?TDEF_FE(shares_are_updated_and_reset), + ?TDEF_FE(jobs_are_added_and_removed), + ?TDEF_FE(can_fetch_job_priority), + ?TDEF_FE(jobs_are_charged), + ?TDEF_FE(usage_is_updated), + ?TDEF_FE(priority_coefficient_works), + ?TDEF_FE(priority_decays_when_jobs_stop_running), + ?TDEF_FE(priority_increases_when_jobs_run), + ?TDEF_FE(two_dbs_equal_shares_equal_number_of_jobs), + ?TDEF_FE(two_dbs_unequal_shares_equal_number_of_jobs), + ?TDEF_FE(two_dbs_equal_shares_unequal_number_of_jobs), + ?TDEF_FE(two_dbs_unequal_shares_unequal_number_of_jobs), + ?TDEF_FE(three_dbs_equal_shares_equal_number_of_jobs), + ?TDEF_FE(three_dbs_unequal_shares_equal_number_of_jobs), + ?TDEF_FE(three_dbs_equal_shares_unequal_number_of_jobs), + ?TDEF_FE(three_dbs_unequal_shares_unequal_number_of_jobs) + ] + } + }. + + +setup_all() -> + test_util:start_couch(). + + +teardown_all(Ctx) -> + config_delete("priority_coeff"), + config_delete("usage_coeff"), + config_shares_delete(), + test_util:stop_couch(Ctx). + + +setup() -> + init(), + ok. + + +teardown(_) -> + clear(), + config_delete("priority_coeff"), + config_delete("usage_coeff"), + config_shares_delete(). + + +init_works(_)-> + Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS], + [?assert(is_list(ets:info(T))) || T <- Tables], + ?assertEqual(#{}, tab2map(?SHARES)), + + clear(), + [?assertEqual(undefined, ets:info(T)) || T <- Tables], + + config_share_set("db1", "200"), + init(), + ?assertEqual(200, shares(?DB1)), + ?assertEqual(#{?DB1 => 200}, tab2map(?SHARES)). + + +shares_are_updated_and_reset(_) -> + ?assertEqual(#{}, tab2map(?SHARES)), + + update_shares(?DB1, 42), + ?assertEqual(42, shares(?DB1)), + + reset_shares(?DB1), + ?assertEqual(100, shares(?DB1)), + ?assertEqual(#{}, tab2map(?SHARES)), + + % min shares + update_shares(?DB1, 0), + ?assertEqual(1, shares(?DB1)), + + % max shares + update_shares(?DB1, 1001), + ?assertEqual(1000, shares(?DB1)). + + +jobs_are_added_and_removed(_) -> + job_added(job(?J1, ?DB1)), + ?assertEqual(1, num_jobs(?DB1)), + ?assertEqual(#{?J1 => 0}, tab2map(?PRIORITIES)), + + job_added(job(?J2, ?DB1)), + ?assertEqual(2, num_jobs(?DB1)), + ?assertEqual(#{?J1 => 0, ?J2 => 0}, tab2map(?PRIORITIES)), + + job_added(job(?J3, ?DB2)), + ?assertEqual(1, num_jobs(?DB2)), + ?assertEqual(#{?J1 => 0, ?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)), + + job_removed(job(?J1, ?DB1)), + ?assertEqual(1, num_jobs(?DB1)), + ?assertEqual(#{?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)), + + job_removed(job(?J3, ?DB2)), + ?assertEqual(0, num_jobs(?DB2)), + ?assertEqual(0, priority(?J3)), + + job_removed(job(?J2, ?DB1)), + ?assertEqual(0, num_jobs(?DB2)), + ?assertEqual(#{}, tab2map(?NUM_JOBS)), + ?assertEqual(0, priority(?J2)), + ?assertEqual(#{}, tab2map(?PRIORITIES)). + + +can_fetch_job_priority(_) -> + job_added(job(?J1, ?DB1)), + ?assertEqual(0, priority(?J1)), + + ets:insert(?PRIORITIES, {?J1, 42}), + ?assertEqual(42, priority(?J1)), + + ets:delete(?PRIORITIES, ?J1), + ?assertEqual(0, priority(?J1)). + + +jobs_are_charged(_) -> + Job1 = running_job(?J1, ?DB1), + job_added(Job1), + ?assertEqual(#{}, tab2map(?CHARGES)), + + charge(Job1, 1000, {0, 1, 0}), + ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)), + + % Stopped jobs are not charged + charge(stop(Job1), 1000, {0, 1, 0}), + ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)), + + % Only charge up to one interval's worth even if job ran longer + charge(Job1, 1000, {0, 5, 0}), + ?assertEqual(#{?DB1 => 2000000}, tab2map(?CHARGES)), + + % Charges are accumulated from jobs in same db + Job2 = running_job(?J2, ?DB1), + job_added(Job2), + charge(Job2, 1000, {0, 0, 1}), + ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)), + + % Charges are not cleared if jobs are removed + job_removed(Job1), + job_removed(Job2), + ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)). + + +usage_is_updated(_) -> + Job = running_job(?J1, ?DB1), + job_added(Job), + + charge(Job, 60000, {0, 60, 0}), + update_usage(), + ?assertEqual(60000000, usage(?DB1)), + + % Charges table is cleared after usage is updated + ?assertEqual(#{}, tab2map(?CHARGES)), + + % Check that usage decay works + config_set("usage_coeff", "0.2"), + update_usage(), + ?assertEqual(12000000, usage(?DB1)), + + config_set("usage_coeff", "0.5"), + update_usage(), + ?assertEqual(6000000, usage(?DB1)), + + % Check that function both decays and updates from charges + charge(Job, 60000, {0, 60, 0}), + update_usage(), + ?assertEqual(63000000, usage(?DB1)), + + % Usage eventually decays to 0 and is removed from the table + [update_usage() || _ <- lists:seq(1, 100)], + ?assertEqual(0, usage(?DB1)), + ?assertEqual(#{}, tab2map(?USAGE)). + + +priority_coefficient_works(_) -> + job_added(job(?J1, ?DB1)), + ets:insert(?PRIORITIES, {?J1, 1000}), + + config_set("priority_coeff", "0.8"), + decay_priorities(), + ?assertEqual(800, priority(?J1)), + + config_set("priority_coeff", "0.5"), + decay_priorities(), + ?assertEqual(400, priority(?J1)), + + % If non-float junk value is set then the default is used + config_set("priority_coeff", "junk"), + decay_priorities(), + ?assertEqual(392, priority(?J1)), + + % Clipped to 1.0 max + config_set("priority_coeff", "1.1"), + decay_priorities(), + ?assertEqual(392, priority(?J1)), + + % Clipped to 0.0 min and removed when =< 0 + config_set("priority_coeff", "-0.1"), + decay_priorities(), + ?assertEqual(0, priority(?J1)), + ?assertEqual(#{}, tab2map(?PRIORITIES)). + + +priority_decays_when_jobs_stop_running(_) -> + Job = running_job(?J1, ?DB1), + job_added(Job), + + % Ran for one cycle then stop + {[], Pending} = reschedule(1, {[Job], []}), + + % Priority is non-0 initially + ?assert(priority(?J1) > 0), + + % Priority decays to 0 after some cycles + [reschedule(0, {[], Pending}) || _ <- lists:seq(1, 500)], + ?assertEqual(0, priority(?J1)). + + +priority_increases_when_jobs_run(_) -> + Job = running_job(?J1, ?DB1), + job_added(Job), + + Running = [Job], + reschedule(0, {Running, []}), + P1 = priority(?J1), + ?assert(P1 > 0), + + % Priority increases + reschedule(0, {Running, []}), + P2 = priority(?J1), + ?assert(P2 > P1), + + % Additive priority increase is balanced out by priority decay + [reschedule(0, {Running, []}) || _ <- lists:seq(1, 500)], + Pn = priority(?J1), + ?assert(Pn > P2), + + reschedule(0, {Running, []}), + Pm = priority(?J1), + ?assertEqual(Pn, Pm). + + +two_dbs_equal_shares_equal_number_of_jobs(_) -> + update_shares(?DB1, 100), + update_shares(?DB2, 100), + Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}), + #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs), + ?assert(49 =< Db1 andalso Db1 =< 51), + ?assert(49 =< Db2 andalso Db2 =< 51). + + +two_dbs_unequal_shares_equal_number_of_jobs(_) -> + update_shares(?DB1, 100), + update_shares(?DB1, 900), + Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}), + #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs), + ?assert(89 =< Db1 andalso Db1 =< 91), + ?assert(9 =< Db2 andalso Db2 =< 11). + + +two_dbs_equal_shares_unequal_number_of_jobs(_) -> + update_shares(?DB1, 100), + update_shares(?DB2, 100), + Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}), + #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs), + ?assert(49 =< Db1 andalso Db1 =< 51), + ?assert(49 =< Db2 andalso Db2 =< 51). + + +two_dbs_unequal_shares_unequal_number_of_jobs(_) -> + update_shares(?DB1, 1), + update_shares(?DB2, 100), + Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}), + #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs), + ?assert(0 =< Db1 andalso Db1 =< 2), + ?assert(98 =< Db2 andalso Db2 =< 100). + + +three_dbs_equal_shares_equal_number_of_jobs(_) -> + update_shares(?DB1, 100), + update_shares(?DB2, 100), + update_shares(?DB3, 100), + Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}), + #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs), + ?assert(32 =< Db1 andalso Db1 =< 34), + ?assert(32 =< Db2 andalso Db2 =< 34), + ?assert(32 =< Db3 andalso Db3 =< 34). + + +three_dbs_unequal_shares_equal_number_of_jobs(_) -> + update_shares(?DB1, 100), + update_shares(?DB2, 700), + update_shares(?DB3, 200), + Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}), + #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs), + ?assert(9 =< Db1 andalso Db1 =< 11), + ?assert(69 =< Db2 andalso Db2 =< 71), + ?assert(19 =< Db3 andalso Db3 =< 21). + + +three_dbs_equal_shares_unequal_number_of_jobs(_) -> + update_shares(?DB1, 100), + update_shares(?DB2, 100), + update_shares(?DB3, 100), + Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 100}, ?DB3 => {25, 75}}), + #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs), + ?assert(32 =< Db1 andalso Db1 =< 34), + ?assert(32 =< Db2 andalso Db2 =< 34), + ?assert(32 =< Db3 andalso Db3 =< 34). + + +three_dbs_unequal_shares_unequal_number_of_jobs(_) -> + update_shares(?DB1, 1000), + update_shares(?DB2, 100), + update_shares(?DB3, 1), + Jobs = jobs(#{?DB1 => {25, 100}, ?DB2 => {25, 125}, ?DB3 => {25, 875}}), + #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs), + ?assert(87 =< Db1 andalso Db1 =< 89), + ?assert(9 =< Db2 andalso Db2 =< 11), + ?assert(2 =< Db3 andalso Db3 =< 4). + + +config_set(K, V) -> + config:set("replicator", K, V, _Persist = false). + + +config_delete(K) -> + config:delete("replicator", K, _Persist = false). + + +config_share_set(K, V) -> + config:set("replicator.shares", K, V, _Persist = false). + + +config_shares_delete() -> + [config:delete("replicator.shares", K, _Persist = false) || + {K, _} <- config:get("replicator.shares")]. + + +tab2map(T) when is_atom(T) -> + maps:from_list(ets:tab2list(T)). + + +job(rand, Db) -> + job(rand:uniform(1 bsl 59), Db); + +job(Id, Db) -> + Job = #job{ + id = Id, + rep = #rep{ + db_name = Db, + user_ctx = #user_ctx{} + } + }, + stop(Job). + + +running_job(Id, Db) -> + run(job(Id, Db)). + + +run(#job{} = Job) -> + Job#job{ + pid = list_to_pid("<0.9999.999>"), + history = [{started, {0, 0, 0}}, {added, {0, 0, 0}}] + }. + + +stop(#job{} = Job) -> + Job#job{ + pid = undefined, + history = [{added, {0, 0, 0}}] + }. + + +% Simple scheduler simulator. Start and stop N jobs and do the +% accounting steps. Return a new list of running and pending jobs. If +% N is 0 then jobs which were running stay running and jobs were +% pending stay pending. +% +reschedule(N, {Running, Pending}) -> + update(Running, 60000, {0, 60, 0}), + + RunPr = [{priority(Job#job.id), Job} || Job <- Running], + StopPr = [{priority(Job#job.id), Job} || Job <- Pending], + + {_, Running1} = lists:unzip(lists:reverse(lists:sort(RunPr))), + {_, Pending1} = lists:unzip(lists:sort(StopPr)), + + ToStop = lists:sublist(Running1, N), + ToStart = lists:sublist(Pending1, N), + + Running2 = [run(Job) || Job <- ToStart] ++ Running1 -- ToStop, + Pending2 = [stop(Job) || Job <- ToStop] ++ Pending1 -- ToStart, + + {Running2, Pending2}. + + +% Run a few scheduling cycles and calculate usage percentage for each db +% +run_scheduler(Cycles, Churn, Jobs0) -> + Acc0 = {#{}, Jobs0}, + + {Sum, _} = lists:foldl(fun(_CycleCnt, {UsageAcc, {Running, _} = Jobs}) -> + UsageAcc1 = lists:foldl(fun(#job{} = Job, Acc) -> + Db = Job#job.rep#rep.db_name, + maps:update_with(Db, fun(V) -> V + 1 end, 0, Acc) + end, UsageAcc, Running), + {UsageAcc1, reschedule(Churn, Jobs)} + end, Acc0, lists:seq(1, Cycles)), + + Total = maps:fold(fun(_, V, Acc) -> Acc + V end, 0, Sum), + maps:map(fun(_Db, V) -> round(V / Total * 100) end, Sum). + + +% Dbs = #{Db => {RunningCount, PendingCount} +% +jobs(#{} = Dbs) -> + maps:fold(fun(Db, {RCnt, PCnt}, {Running, Pending}) -> + RJobs = [running_job(rand, Db) || _ <- lists:seq(1, RCnt)], + PJobs = [job(rand, Db) || _ <- lists:seq(1, PCnt)], + [job_added(Job) || Job <- RJobs ++ PJobs], + {Running ++ RJobs, Pending ++ PJobs} + end, {[], []}, Dbs). + + +-endif. diff --git a/src/couch_replicator/test/eunit/couch_replicator_test.hrl b/src/couch_replicator/test/eunit/couch_replicator_test.hrl new file mode 100644 index 000000000..6db97ec2b --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_test.hrl @@ -0,0 +1,35 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +% Borrowed from fabric2_test.hrl + +% Some test modules do not use with, so squash the unused fun compiler warning +-compile([{nowarn_unused_function, [{with, 1}]}]). + + +-define(TDEF(Name), {atom_to_list(Name), fun Name/1}). +-define(TDEF(Name, Timeout), {atom_to_list(Name), Timeout, fun Name/1}). + +-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end). +-define(TDEF_FE(Name, Timeout), fun(Arg) -> {atom_to_list(Name), {timeout, Timeout, ?_test(Name(Arg))}} end). + + +with(Tests) -> + fun(ArgsTuple) -> + lists:map(fun + ({Name, Fun}) -> + {Name, ?_test(Fun(ArgsTuple))}; + ({Name, Timeout, Fun}) -> + {Name, {timeout, Timeout, ?_test(Fun(ArgsTuple))}} + end, Tests) + end. |