diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_share.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_share.erl | 762 |
1 files changed, 0 insertions, 762 deletions
diff --git a/src/couch_replicator/src/couch_replicator_share.erl b/src/couch_replicator/src/couch_replicator_share.erl deleted file mode 100644 index 8c9fa029a..000000000 --- a/src/couch_replicator/src/couch_replicator_share.erl +++ /dev/null @@ -1,762 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -% This module implements the "Fair Share" algorithm by Judy Kay and Piers -% Lauder [1] and applies it to the scheduling of replication jobs. -% -% The main idea is _replicator dbs can have a configurable number of "shares" -% assigned to them. Shares is an abstract quantity from 1 to 1000. The default -% is 100. Jobs from _replicator databases with more shares get proportionally a -% higher chance to run than those from databases with a lower number of shares. -% -% Every scheduler cycle running jobs are "charged" based on how much time they -% spent running during that cycle. At the end of the cycle the accumulated -% charges for each job, the number of shares configured, and the total number -% of jobs in the pending queue from the same _replicator db, are used to -% calculate new priority values for all the jobs. To match the algorithm from -% the paper, jobs with lower priority values are the ones at the front of the -% run queue and have a higher chance of running. -% -% Here is how charges, shares, and number of sibling jobs affect the -% priority value: -% -% 1) Jobs from dbs with higher configured shares get assigned lower -% priority values and so stay closer to the front of the queue. -% -% 2) Jobs from dbs with many other jobs (many siblings) get assigned a -% higher priority value, so they get pushed further down the queue -% and have a lower chance of running. -% -% 3) Jobs which run longer accumulate more charges and get assigned a -% higher priority value and get to wait longer to run. -% -% In order to prevent job starvation, all job priorities are periodicaly -% decayed (decreased). This effectively moves all the jobs towards the front of -% the run queue. So, in effect, there are two competing processes: one -% uniformly moves all jobs to the front, and the other throws them back in -% proportion to those factors mentioned above. The speed of this uniform -% priority decay is controlled by the priority_coeff parameter. -% -% In order to prevent jobs from low shares dbs from "cheating" by getting -% deleted and immediately re-added, charges are accumulated using a -% historically decayed usage value. The speed of the usage decay is controlled -% by the `usage_coeff = 0.5` parameter. -% -% [1] : https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf - --module(couch_replicator_share). - --export([ - init/0, - clear/0, - update_shares/2, - reset_shares/1, - job_added/1, - job_removed/1, - update/3, - priority/1, - charge/3 -]). - --include_lib("couch/include/couch_db.hrl"). --include("couch_replicator.hrl"). - -% Usage coefficient decays historic usage every scheduling cycle. For example, -% the usage value for a job running 1 minute is 60000000 (i.e microseconds / -% minute), then if the job stops running it will take about 26 cycles (minutes) -% for it to decay to 0 and the system to "forget" about it completely: -% -% trunc(60000000 * math:pow(0.5, 26)) = 0 -% --define(DEFAULT_USAGE_COEFF, 0.5). - -% Priority coefficient decays all the job priorities such that they slowly -% drift towards the front of the run queue. This coefficient defines a maximum -% time window over which this algorithm would operate. For example, if this -% value is too small (0.1), after a few cycles quite a few jobs would end up at -% priority 0, and would render this algorithm useless. The default value of -% 0.98 is picked such that if a job ran for one scheduler cycle, then didn't -% get to run for 7 hours, it would still have priority > 0. 7 hours was picked -% as it was close enought to 8 hours which is the default maximum error backoff -% interval. -% -% Example calculation: -% shares = 100 -% usage after 1 minute cycle run = 60000000 -% initial priority = 60000000 / (100 * 100) = 6000 -% trunc(6000 * math:pow(0.98, 431)) = 0 -% 431 / 60 ~= 7 hrs -% --define(DEFAULT_PRIORITY_COEFF, 0.98). - --define(MIN_SHARES, 1). --define(MAX_SHARES, 1000). --define(DEFAULT_SHARES, 100). - --define(SHARES, couch_replicator_shares). --define(PRIORITIES, couch_replicator_priorities). --define(USAGE, couch_replicator_usage). --define(CHARGES, couch_replicator_stopped_usage). --define(NUM_JOBS, couch_replicator_num_jobs). - -init() -> - EtsOpts = [named_table, public], - % {Key, Shares} - ?SHARES = ets:new(?SHARES, EtsOpts), - % {JobId, Priority} - ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), - % {Key, Usage} - ?USAGE = ets:new(?USAGE, EtsOpts), - % {Key, Charges} - ?CHARGES = ets:new(?CHARGES, EtsOpts), - % {Key, NumJobs} - ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), - lists:foreach(fun({K, V}) -> update_shares(K, V) end, get_config_shares()). - -clear() -> - Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS], - lists:foreach(fun(T) -> catch ets:delete(T) end, Tables). - -% This should be called when user updates the replicator.shares config section -% -update_shares(Key, Shares) when is_integer(Shares) -> - ets:insert(?SHARES, {Key, bounded(Shares, ?MIN_SHARES, ?MAX_SHARES)}). - -% Called when the config value is deleted and shares are reset to the default -% value. -reset_shares(Key) -> - ets:delete(?SHARES, Key). - -job_added(#job{} = Job) -> - Key = key(Job), - % If the entry is not present {Key, 0} is used as the default - ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}), - % Update job's priority as if it ran during one scheduler cycle. This is so - % new jobs don't get to be at priority 0 (highest). - update_priority(Job). - -job_removed(#job{} = Job) -> - Key = key(Job), - ets:delete(?PRIORITIES, Job#job.id), - case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of - N when is_integer(N), N =< 0 -> - ets:delete(?NUM_JOBS, Key); - N when is_integer(N), N > 0 -> - ok - end, - ok. - -% This is the main algorithm update function. It should be called during each -% rescheduling cycle with a list of running jobs, the interval from the -% scheduler (in milliseconds), and the current timestamp. -% -% This function does all three main steps as described in [1]. -% -% 1. Update usage from all the charges in the last scheduling cycle -% -% 2. Uniformly decay all job priorities -% -% 3. Update priorities for all the running jobs based on usage and number of -% sibling jobs. -% -update(RunningJobs, Interval, {_, _, _} = Now) -> - lists:foreach(fun(Job) -> charge(Job, Interval, Now) end, RunningJobs), - update_usage(), - decay_priorities(), - lists:foreach(fun(Job) -> update_priority(Job) end, RunningJobs). - -priority(JobId) -> - % Not found means it was removed because it's value was 0 - case ets:lookup(?PRIORITIES, JobId) of - [{_, Priority}] -> Priority; - [] -> 0 - end. - -charge(#job{pid = undefined}, _, _) -> - 0; -charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) -> - Key = key(Job), - Charges = job_charges(Job, Interval, Now), - % If the entry is not present {Key, 0} is used as the default - ets:update_counter(?CHARGES, Key, Charges, {Key, 0}). - -usage(Key) -> - case ets:lookup(?USAGE, Key) of - [{_, Usage}] -> Usage; - [] -> 0 - end. - -num_jobs(Key) -> - case ets:lookup(?NUM_JOBS, Key) of - [{_, NumJobs}] -> NumJobs; - [] -> 0 - end. - -shares(Key) -> - case ets:lookup(?SHARES, Key) of - [{_, Shares}] -> Shares; - [] -> ?DEFAULT_SHARES - end. - -% In [1] this described in the "Decay of Process Priorities" section -% -decay_priorities() -> - decay(?PRIORITIES, priority_coeff()), - % If priority becomes 0, it's removed. When looking it up, if it - % is missing we assume it is 0 - clear_zero(?PRIORITIES). - -% This is the main part of the alrgorithm. In [1] it is described in the -% "Priority Adjustment" section. -% -update_priority(#job{} = Job) -> - Id = Job#job.id, - Key = key(Job), - Shares = shares(Key), - Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares), - % If the entry is not present {Id, 0} is used as the default - ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}). - -% This is the "User-Level Scheduling" part from [1] -% -update_usage() -> - decay(?USAGE, usage_coeff()), - clear_zero(?USAGE), - ets:foldl( - fun({Key, Charges}, _) -> - % If the entry is not present {Key, 0} is used as the default - ets:update_counter(?USAGE, Key, Charges, {Key, 0}) - end, - 0, - ?CHARGES - ), - % Start each interval with a fresh charges table - ets:delete_all_objects(?CHARGES). - -% Private helper functions - -decay(Ets, Coeff) when is_atom(Ets) -> - % Use trunc to ensure the result stays an integer in order for - % ets:update_counter to work properly. It throws a badarg otherwise. - Head = {'$1', '$2'}, - Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}}, - ets:select_replace(Ets, [{Head, [], [Result]}]). - -clear_zero(Ets) when is_atom(Ets) -> - ets:select_delete(Ets, [{{'_', '$1'}, [{'=<', '$1', 0}], [true]}]). - -key(#job{} = Job) -> - Rep = Job#job.rep, - case is_binary(Rep#rep.db_name) of - true -> mem3:dbname(Rep#rep.db_name); - false -> (Rep#rep.user_ctx)#user_ctx.name - end. - -% Jobs are charged based on the amount of time the job was running during the -% last scheduling interval. The time units used are microseconds in order to -% have a large enough usage values so that when priority is calculated the -% rounded value won't be rounded off to 0 easily. The formula for the priority -% calculation is: -% -% Priority = (Usage * NumJobs) / Shares^2 -% -% Then in the worst case of a single job in the db, running only for one -% second,for one job, with 1000 (max) shares, the priority would be: -% -% 1000000 * 1 / (1000^2) = 1 -% -job_charges(#job{} = Job, IntervalMSec, {_, _, _} = Now) -> - TimeRunning = timer:now_diff(Now, last_started(Job)), - IntervalUSec = IntervalMSec * 1000, - bounded(TimeRunning, 0, IntervalUSec). - -last_started(#job{} = Job) -> - case lists:keyfind(started, 1, Job#job.history) of - % In case user set too low of a max history - false -> {0, 0, 0}; - {started, When} -> When - end. - -bounded(Val, Min, Max) -> - max(Min, min(Max, Val)). - -% Config helper functions - -get_config_shares() -> - lists:map( - fun({K, V}) -> - {list_to_binary(K), int_val(V, ?DEFAULT_SHARES)} - end, - config:get("replicator.shares") - ). - -priority_coeff() -> - % This is the K2 coefficient from [1] - Default = ?DEFAULT_PRIORITY_COEFF, - Val = float_val(config:get("replicator", "priority_coeff"), Default), - bounded(Val, 0.0, 1.0). - -usage_coeff() -> - % This is the K1 coefficient from [1] - Default = ?DEFAULT_USAGE_COEFF, - Val = float_val(config:get("replicator", "usage_coeff"), Default), - bounded(Val, 0.0, 1.0). - -int_val(Str, Default) when is_list(Str) -> - try list_to_integer(Str) of - Val -> Val - catch - error:badarg -> - Default - end. - -float_val(undefined, Default) -> - Default; -float_val(Str, Default) when is_list(Str) -> - try list_to_float(Str) of - Val -> Val - catch - error:badarg -> - Default - end. - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). --include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch_replicator/test/eunit/couch_replicator_test.hrl"). - --define(DB1, <<"db1">>). --define(DB2, <<"db2">>). --define(DB3, <<"db3">>). --define(J1, <<"j1">>). --define(J2, <<"j2">>). --define(J3, <<"j3">>). - -fair_share_test_() -> - { - setup, - fun setup_all/0, - fun teardown_all/1, - { - foreach, - fun setup/0, - fun teardown/1, - [ - ?TDEF_FE(init_works), - ?TDEF_FE(shares_are_updated_and_reset), - ?TDEF_FE(jobs_are_added_and_removed), - ?TDEF_FE(can_fetch_job_priority), - ?TDEF_FE(jobs_are_charged), - ?TDEF_FE(usage_is_updated), - ?TDEF_FE(priority_coefficient_works), - ?TDEF_FE(priority_decays_when_jobs_stop_running), - ?TDEF_FE(priority_increases_when_jobs_run), - ?TDEF_FE(two_dbs_equal_shares_equal_number_of_jobs), - ?TDEF_FE(two_dbs_unequal_shares_equal_number_of_jobs), - ?TDEF_FE(two_dbs_equal_shares_unequal_number_of_jobs), - ?TDEF_FE(two_dbs_unequal_shares_unequal_number_of_jobs), - ?TDEF_FE(three_dbs_equal_shares_equal_number_of_jobs), - ?TDEF_FE(three_dbs_unequal_shares_equal_number_of_jobs), - ?TDEF_FE(three_dbs_equal_shares_unequal_number_of_jobs), - ?TDEF_FE(three_dbs_unequal_shares_unequal_number_of_jobs) - ] - } - }. - -setup_all() -> - test_util:start_couch(). - -teardown_all(Ctx) -> - config_delete("priority_coeff"), - config_delete("usage_coeff"), - config_shares_delete(), - test_util:stop_couch(Ctx). - -setup() -> - init(), - ok. - -teardown(_) -> - clear(), - config_delete("priority_coeff"), - config_delete("usage_coeff"), - config_shares_delete(). - -init_works(_) -> - Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS], - [?assert(is_list(ets:info(T))) || T <- Tables], - ?assertEqual(#{}, tab2map(?SHARES)), - - clear(), - [?assertEqual(undefined, ets:info(T)) || T <- Tables], - - config_share_set("db1", "200"), - init(), - ?assertEqual(200, shares(?DB1)), - ?assertEqual(#{?DB1 => 200}, tab2map(?SHARES)). - -shares_are_updated_and_reset(_) -> - ?assertEqual(#{}, tab2map(?SHARES)), - - update_shares(?DB1, 42), - ?assertEqual(42, shares(?DB1)), - - reset_shares(?DB1), - ?assertEqual(100, shares(?DB1)), - ?assertEqual(#{}, tab2map(?SHARES)), - - % min shares - update_shares(?DB1, 0), - ?assertEqual(1, shares(?DB1)), - - % max shares - update_shares(?DB1, 1001), - ?assertEqual(1000, shares(?DB1)). - -jobs_are_added_and_removed(_) -> - job_added(job(?J1, ?DB1)), - ?assertEqual(1, num_jobs(?DB1)), - ?assertEqual(#{?J1 => 0}, tab2map(?PRIORITIES)), - - job_added(job(?J2, ?DB1)), - ?assertEqual(2, num_jobs(?DB1)), - ?assertEqual(#{?J1 => 0, ?J2 => 0}, tab2map(?PRIORITIES)), - - job_added(job(?J3, ?DB2)), - ?assertEqual(1, num_jobs(?DB2)), - ?assertEqual(#{?J1 => 0, ?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)), - - job_removed(job(?J1, ?DB1)), - ?assertEqual(1, num_jobs(?DB1)), - ?assertEqual(#{?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)), - - job_removed(job(?J3, ?DB2)), - ?assertEqual(0, num_jobs(?DB2)), - ?assertEqual(0, priority(?J3)), - - job_removed(job(?J2, ?DB1)), - ?assertEqual(0, num_jobs(?DB2)), - ?assertEqual(#{}, tab2map(?NUM_JOBS)), - ?assertEqual(0, priority(?J2)), - ?assertEqual(#{}, tab2map(?PRIORITIES)). - -can_fetch_job_priority(_) -> - job_added(job(?J1, ?DB1)), - ?assertEqual(0, priority(?J1)), - - ets:insert(?PRIORITIES, {?J1, 42}), - ?assertEqual(42, priority(?J1)), - - ets:delete(?PRIORITIES, ?J1), - ?assertEqual(0, priority(?J1)). - -jobs_are_charged(_) -> - Job1 = running_job(?J1, ?DB1), - job_added(Job1), - ?assertEqual(#{}, tab2map(?CHARGES)), - - charge(Job1, 1000, {0, 1, 0}), - ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)), - - % Stopped jobs are not charged - charge(stop(Job1), 1000, {0, 1, 0}), - ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)), - - % Only charge up to one interval's worth even if job ran longer - charge(Job1, 1000, {0, 5, 0}), - ?assertEqual(#{?DB1 => 2000000}, tab2map(?CHARGES)), - - % Charges are accumulated from jobs in same db - Job2 = running_job(?J2, ?DB1), - job_added(Job2), - charge(Job2, 1000, {0, 0, 1}), - ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)), - - % Charges are not cleared if jobs are removed - job_removed(Job1), - job_removed(Job2), - ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)). - -usage_is_updated(_) -> - Job = running_job(?J1, ?DB1), - job_added(Job), - - charge(Job, 60000, {0, 60, 0}), - update_usage(), - ?assertEqual(60000000, usage(?DB1)), - - % Charges table is cleared after usage is updated - ?assertEqual(#{}, tab2map(?CHARGES)), - - % Check that usage decay works - config_set("usage_coeff", "0.2"), - update_usage(), - ?assertEqual(12000000, usage(?DB1)), - - config_set("usage_coeff", "0.5"), - update_usage(), - ?assertEqual(6000000, usage(?DB1)), - - % Check that function both decays and updates from charges - charge(Job, 60000, {0, 60, 0}), - update_usage(), - ?assertEqual(63000000, usage(?DB1)), - - % Usage eventually decays to 0 and is removed from the table - [update_usage() || _ <- lists:seq(1, 100)], - ?assertEqual(0, usage(?DB1)), - ?assertEqual(#{}, tab2map(?USAGE)). - -priority_coefficient_works(_) -> - job_added(job(?J1, ?DB1)), - ets:insert(?PRIORITIES, {?J1, 1000}), - - config_set("priority_coeff", "0.8"), - decay_priorities(), - ?assertEqual(800, priority(?J1)), - - config_set("priority_coeff", "0.5"), - decay_priorities(), - ?assertEqual(400, priority(?J1)), - - % If non-float junk value is set then the default is used - config_set("priority_coeff", "junk"), - decay_priorities(), - ?assertEqual(392, priority(?J1)), - - % Clipped to 1.0 max - config_set("priority_coeff", "1.1"), - decay_priorities(), - ?assertEqual(392, priority(?J1)), - - % Clipped to 0.0 min and removed when =< 0 - config_set("priority_coeff", "-0.1"), - decay_priorities(), - ?assertEqual(0, priority(?J1)), - ?assertEqual(#{}, tab2map(?PRIORITIES)). - -priority_decays_when_jobs_stop_running(_) -> - Job = running_job(?J1, ?DB1), - job_added(Job), - - % Ran for one cycle then stop - {[], Pending} = reschedule(1, {[Job], []}), - - % Priority is non-0 initially - ?assert(priority(?J1) > 0), - - % Priority decays to 0 after some cycles - [reschedule(0, {[], Pending}) || _ <- lists:seq(1, 500)], - ?assertEqual(0, priority(?J1)). - -priority_increases_when_jobs_run(_) -> - Job = running_job(?J1, ?DB1), - job_added(Job), - - Running = [Job], - reschedule(0, {Running, []}), - P1 = priority(?J1), - ?assert(P1 > 0), - - % Priority increases - reschedule(0, {Running, []}), - P2 = priority(?J1), - ?assert(P2 > P1), - - % Additive priority increase is balanced out by priority decay - [reschedule(0, {Running, []}) || _ <- lists:seq(1, 500)], - Pn = priority(?J1), - ?assert(Pn > P2), - - reschedule(0, {Running, []}), - Pm = priority(?J1), - ?assertEqual(Pn, Pm). - -two_dbs_equal_shares_equal_number_of_jobs(_) -> - update_shares(?DB1, 100), - update_shares(?DB2, 100), - Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}), - #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs), - ?assert(49 =< Db1 andalso Db1 =< 51), - ?assert(49 =< Db2 andalso Db2 =< 51). - -two_dbs_unequal_shares_equal_number_of_jobs(_) -> - update_shares(?DB1, 100), - update_shares(?DB1, 900), - Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}), - #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs), - ?assert(89 =< Db1 andalso Db1 =< 91), - ?assert(9 =< Db2 andalso Db2 =< 11). - -two_dbs_equal_shares_unequal_number_of_jobs(_) -> - update_shares(?DB1, 100), - update_shares(?DB2, 100), - Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}), - #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs), - ?assert(49 =< Db1 andalso Db1 =< 51), - ?assert(49 =< Db2 andalso Db2 =< 51). - -two_dbs_unequal_shares_unequal_number_of_jobs(_) -> - update_shares(?DB1, 1), - update_shares(?DB2, 100), - Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}), - #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs), - ?assert(0 =< Db1 andalso Db1 =< 2), - ?assert(98 =< Db2 andalso Db2 =< 100). - -three_dbs_equal_shares_equal_number_of_jobs(_) -> - update_shares(?DB1, 100), - update_shares(?DB2, 100), - update_shares(?DB3, 100), - Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}), - #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs), - ?assert(32 =< Db1 andalso Db1 =< 34), - ?assert(32 =< Db2 andalso Db2 =< 34), - ?assert(32 =< Db3 andalso Db3 =< 34). - -three_dbs_unequal_shares_equal_number_of_jobs(_) -> - update_shares(?DB1, 100), - update_shares(?DB2, 700), - update_shares(?DB3, 200), - Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}), - #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs), - ?assert(9 =< Db1 andalso Db1 =< 11), - ?assert(69 =< Db2 andalso Db2 =< 71), - ?assert(19 =< Db3 andalso Db3 =< 21). - -three_dbs_equal_shares_unequal_number_of_jobs(_) -> - update_shares(?DB1, 100), - update_shares(?DB2, 100), - update_shares(?DB3, 100), - Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 100}, ?DB3 => {25, 75}}), - #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs), - ?assert(32 =< Db1 andalso Db1 =< 34), - ?assert(32 =< Db2 andalso Db2 =< 34), - ?assert(32 =< Db3 andalso Db3 =< 34). - -three_dbs_unequal_shares_unequal_number_of_jobs(_) -> - update_shares(?DB1, 1000), - update_shares(?DB2, 100), - update_shares(?DB3, 1), - Jobs = jobs(#{?DB1 => {25, 100}, ?DB2 => {25, 125}, ?DB3 => {25, 875}}), - #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs), - ?assert(87 =< Db1 andalso Db1 =< 89), - ?assert(9 =< Db2 andalso Db2 =< 11), - ?assert(2 =< Db3 andalso Db3 =< 4). - -config_set(K, V) -> - config:set("replicator", K, V, _Persist = false). - -config_delete(K) -> - config:delete("replicator", K, _Persist = false). - -config_share_set(K, V) -> - config:set("replicator.shares", K, V, _Persist = false). - -config_shares_delete() -> - [ - config:delete("replicator.shares", K, _Persist = false) - || {K, _} <- config:get("replicator.shares") - ]. - -tab2map(T) when is_atom(T) -> - maps:from_list(ets:tab2list(T)). - -job(rand, Db) -> - job(rand:uniform(1 bsl 59), Db); -job(Id, Db) -> - Job = #job{ - id = Id, - rep = #rep{ - db_name = Db, - user_ctx = #user_ctx{} - } - }, - stop(Job). - -running_job(Id, Db) -> - run(job(Id, Db)). - -run(#job{} = Job) -> - Job#job{ - pid = list_to_pid("<0.9999.999>"), - history = [{started, {0, 0, 0}}, {added, {0, 0, 0}}] - }. - -stop(#job{} = Job) -> - Job#job{ - pid = undefined, - history = [{added, {0, 0, 0}}] - }. - -% Simple scheduler simulator. Start and stop N jobs and do the -% accounting steps. Return a new list of running and pending jobs. If -% N is 0 then jobs which were running stay running and jobs were -% pending stay pending. -% -reschedule(N, {Running, Pending}) -> - update(Running, 60000, {0, 60, 0}), - - RunPr = [{priority(Job#job.id), Job} || Job <- Running], - StopPr = [{priority(Job#job.id), Job} || Job <- Pending], - - {_, Running1} = lists:unzip(lists:reverse(lists:sort(RunPr))), - {_, Pending1} = lists:unzip(lists:sort(StopPr)), - - ToStop = lists:sublist(Running1, N), - ToStart = lists:sublist(Pending1, N), - - Running2 = [run(Job) || Job <- ToStart] ++ Running1 -- ToStop, - Pending2 = [stop(Job) || Job <- ToStop] ++ Pending1 -- ToStart, - - {Running2, Pending2}. - -% Run a few scheduling cycles and calculate usage percentage for each db -% -run_scheduler(Cycles, Churn, Jobs0) -> - Acc0 = {#{}, Jobs0}, - - {Sum, _} = lists:foldl( - fun(_CycleCnt, {UsageAcc, {Running, _} = Jobs}) -> - UsageAcc1 = lists:foldl( - fun(#job{} = Job, Acc) -> - Db = Job#job.rep#rep.db_name, - maps:update_with(Db, fun(V) -> V + 1 end, 0, Acc) - end, - UsageAcc, - Running - ), - {UsageAcc1, reschedule(Churn, Jobs)} - end, - Acc0, - lists:seq(1, Cycles) - ), - - Total = maps:fold(fun(_, V, Acc) -> Acc + V end, 0, Sum), - maps:map(fun(_Db, V) -> round(V / Total * 100) end, Sum). - -% Dbs = #{Db => {RunningCount, PendingCount} -% -jobs(#{} = Dbs) -> - maps:fold( - fun(Db, {RCnt, PCnt}, {Running, Pending}) -> - RJobs = [running_job(rand, Db) || _ <- lists:seq(1, RCnt)], - PJobs = [job(rand, Db) || _ <- lists:seq(1, PCnt)], - [job_added(Job) || Job <- RJobs ++ PJobs], - {Running ++ RJobs, Pending ++ PJobs} - end, - {[], []}, - Dbs - ). - --endif. |