summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_share.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_share.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_share.erl762
1 files changed, 0 insertions, 762 deletions
diff --git a/src/couch_replicator/src/couch_replicator_share.erl b/src/couch_replicator/src/couch_replicator_share.erl
deleted file mode 100644
index 8c9fa029a..000000000
--- a/src/couch_replicator/src/couch_replicator_share.erl
+++ /dev/null
@@ -1,762 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% This module implements the "Fair Share" algorithm by Judy Kay and Piers
-% Lauder [1] and applies it to the scheduling of replication jobs.
-%
-% The main idea is _replicator dbs can have a configurable number of "shares"
-% assigned to them. Shares is an abstract quantity from 1 to 1000. The default
-% is 100. Jobs from _replicator databases with more shares get proportionally a
-% higher chance to run than those from databases with a lower number of shares.
-%
-% Every scheduler cycle running jobs are "charged" based on how much time they
-% spent running during that cycle. At the end of the cycle the accumulated
-% charges for each job, the number of shares configured, and the total number
-% of jobs in the pending queue from the same _replicator db, are used to
-% calculate new priority values for all the jobs. To match the algorithm from
-% the paper, jobs with lower priority values are the ones at the front of the
-% run queue and have a higher chance of running.
-%
-% Here is how charges, shares, and number of sibling jobs affect the
-% priority value:
-%
-% 1) Jobs from dbs with higher configured shares get assigned lower
-% priority values and so stay closer to the front of the queue.
-%
-% 2) Jobs from dbs with many other jobs (many siblings) get assigned a
-% higher priority value, so they get pushed further down the queue
-% and have a lower chance of running.
-%
-% 3) Jobs which run longer accumulate more charges and get assigned a
-% higher priority value and get to wait longer to run.
-%
-% In order to prevent job starvation, all job priorities are periodicaly
-% decayed (decreased). This effectively moves all the jobs towards the front of
-% the run queue. So, in effect, there are two competing processes: one
-% uniformly moves all jobs to the front, and the other throws them back in
-% proportion to those factors mentioned above. The speed of this uniform
-% priority decay is controlled by the priority_coeff parameter.
-%
-% In order to prevent jobs from low shares dbs from "cheating" by getting
-% deleted and immediately re-added, charges are accumulated using a
-% historically decayed usage value. The speed of the usage decay is controlled
-% by the `usage_coeff = 0.5` parameter.
-%
-% [1] : https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
-
--module(couch_replicator_share).
-
--export([
- init/0,
- clear/0,
- update_shares/2,
- reset_shares/1,
- job_added/1,
- job_removed/1,
- update/3,
- priority/1,
- charge/3
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include("couch_replicator.hrl").
-
-% Usage coefficient decays historic usage every scheduling cycle. For example,
-% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
-% minute), then if the job stops running it will take about 26 cycles (minutes)
-% for it to decay to 0 and the system to "forget" about it completely:
-%
-% trunc(60000000 * math:pow(0.5, 26)) = 0
-%
--define(DEFAULT_USAGE_COEFF, 0.5).
-
-% Priority coefficient decays all the job priorities such that they slowly
-% drift towards the front of the run queue. This coefficient defines a maximum
-% time window over which this algorithm would operate. For example, if this
-% value is too small (0.1), after a few cycles quite a few jobs would end up at
-% priority 0, and would render this algorithm useless. The default value of
-% 0.98 is picked such that if a job ran for one scheduler cycle, then didn't
-% get to run for 7 hours, it would still have priority > 0. 7 hours was picked
-% as it was close enought to 8 hours which is the default maximum error backoff
-% interval.
-%
-% Example calculation:
-% shares = 100
-% usage after 1 minute cycle run = 60000000
-% initial priority = 60000000 / (100 * 100) = 6000
-% trunc(6000 * math:pow(0.98, 431)) = 0
-% 431 / 60 ~= 7 hrs
-%
--define(DEFAULT_PRIORITY_COEFF, 0.98).
-
--define(MIN_SHARES, 1).
--define(MAX_SHARES, 1000).
--define(DEFAULT_SHARES, 100).
-
--define(SHARES, couch_replicator_shares).
--define(PRIORITIES, couch_replicator_priorities).
--define(USAGE, couch_replicator_usage).
--define(CHARGES, couch_replicator_stopped_usage).
--define(NUM_JOBS, couch_replicator_num_jobs).
-
-init() ->
- EtsOpts = [named_table, public],
- % {Key, Shares}
- ?SHARES = ets:new(?SHARES, EtsOpts),
- % {JobId, Priority}
- ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts),
- % {Key, Usage}
- ?USAGE = ets:new(?USAGE, EtsOpts),
- % {Key, Charges}
- ?CHARGES = ets:new(?CHARGES, EtsOpts),
- % {Key, NumJobs}
- ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts),
- lists:foreach(fun({K, V}) -> update_shares(K, V) end, get_config_shares()).
-
-clear() ->
- Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
- lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
-
-% This should be called when user updates the replicator.shares config section
-%
-update_shares(Key, Shares) when is_integer(Shares) ->
- ets:insert(?SHARES, {Key, bounded(Shares, ?MIN_SHARES, ?MAX_SHARES)}).
-
-% Called when the config value is deleted and shares are reset to the default
-% value.
-reset_shares(Key) ->
- ets:delete(?SHARES, Key).
-
-job_added(#job{} = Job) ->
- Key = key(Job),
- % If the entry is not present {Key, 0} is used as the default
- ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
- % Update job's priority as if it ran during one scheduler cycle. This is so
- % new jobs don't get to be at priority 0 (highest).
- update_priority(Job).
-
-job_removed(#job{} = Job) ->
- Key = key(Job),
- ets:delete(?PRIORITIES, Job#job.id),
- case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
- N when is_integer(N), N =< 0 ->
- ets:delete(?NUM_JOBS, Key);
- N when is_integer(N), N > 0 ->
- ok
- end,
- ok.
-
-% This is the main algorithm update function. It should be called during each
-% rescheduling cycle with a list of running jobs, the interval from the
-% scheduler (in milliseconds), and the current timestamp.
-%
-% This function does all three main steps as described in [1].
-%
-% 1. Update usage from all the charges in the last scheduling cycle
-%
-% 2. Uniformly decay all job priorities
-%
-% 3. Update priorities for all the running jobs based on usage and number of
-% sibling jobs.
-%
-update(RunningJobs, Interval, {_, _, _} = Now) ->
- lists:foreach(fun(Job) -> charge(Job, Interval, Now) end, RunningJobs),
- update_usage(),
- decay_priorities(),
- lists:foreach(fun(Job) -> update_priority(Job) end, RunningJobs).
-
-priority(JobId) ->
- % Not found means it was removed because it's value was 0
- case ets:lookup(?PRIORITIES, JobId) of
- [{_, Priority}] -> Priority;
- [] -> 0
- end.
-
-charge(#job{pid = undefined}, _, _) ->
- 0;
-charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) ->
- Key = key(Job),
- Charges = job_charges(Job, Interval, Now),
- % If the entry is not present {Key, 0} is used as the default
- ets:update_counter(?CHARGES, Key, Charges, {Key, 0}).
-
-usage(Key) ->
- case ets:lookup(?USAGE, Key) of
- [{_, Usage}] -> Usage;
- [] -> 0
- end.
-
-num_jobs(Key) ->
- case ets:lookup(?NUM_JOBS, Key) of
- [{_, NumJobs}] -> NumJobs;
- [] -> 0
- end.
-
-shares(Key) ->
- case ets:lookup(?SHARES, Key) of
- [{_, Shares}] -> Shares;
- [] -> ?DEFAULT_SHARES
- end.
-
-% In [1] this described in the "Decay of Process Priorities" section
-%
-decay_priorities() ->
- decay(?PRIORITIES, priority_coeff()),
- % If priority becomes 0, it's removed. When looking it up, if it
- % is missing we assume it is 0
- clear_zero(?PRIORITIES).
-
-% This is the main part of the alrgorithm. In [1] it is described in the
-% "Priority Adjustment" section.
-%
-update_priority(#job{} = Job) ->
- Id = Job#job.id,
- Key = key(Job),
- Shares = shares(Key),
- Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares),
- % If the entry is not present {Id, 0} is used as the default
- ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}).
-
-% This is the "User-Level Scheduling" part from [1]
-%
-update_usage() ->
- decay(?USAGE, usage_coeff()),
- clear_zero(?USAGE),
- ets:foldl(
- fun({Key, Charges}, _) ->
- % If the entry is not present {Key, 0} is used as the default
- ets:update_counter(?USAGE, Key, Charges, {Key, 0})
- end,
- 0,
- ?CHARGES
- ),
- % Start each interval with a fresh charges table
- ets:delete_all_objects(?CHARGES).
-
-% Private helper functions
-
-decay(Ets, Coeff) when is_atom(Ets) ->
- % Use trunc to ensure the result stays an integer in order for
- % ets:update_counter to work properly. It throws a badarg otherwise.
- Head = {'$1', '$2'},
- Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}},
- ets:select_replace(Ets, [{Head, [], [Result]}]).
-
-clear_zero(Ets) when is_atom(Ets) ->
- ets:select_delete(Ets, [{{'_', '$1'}, [{'=<', '$1', 0}], [true]}]).
-
-key(#job{} = Job) ->
- Rep = Job#job.rep,
- case is_binary(Rep#rep.db_name) of
- true -> mem3:dbname(Rep#rep.db_name);
- false -> (Rep#rep.user_ctx)#user_ctx.name
- end.
-
-% Jobs are charged based on the amount of time the job was running during the
-% last scheduling interval. The time units used are microseconds in order to
-% have a large enough usage values so that when priority is calculated the
-% rounded value won't be rounded off to 0 easily. The formula for the priority
-% calculation is:
-%
-% Priority = (Usage * NumJobs) / Shares^2
-%
-% Then in the worst case of a single job in the db, running only for one
-% second,for one job, with 1000 (max) shares, the priority would be:
-%
-% 1000000 * 1 / (1000^2) = 1
-%
-job_charges(#job{} = Job, IntervalMSec, {_, _, _} = Now) ->
- TimeRunning = timer:now_diff(Now, last_started(Job)),
- IntervalUSec = IntervalMSec * 1000,
- bounded(TimeRunning, 0, IntervalUSec).
-
-last_started(#job{} = Job) ->
- case lists:keyfind(started, 1, Job#job.history) of
- % In case user set too low of a max history
- false -> {0, 0, 0};
- {started, When} -> When
- end.
-
-bounded(Val, Min, Max) ->
- max(Min, min(Max, Val)).
-
-% Config helper functions
-
-get_config_shares() ->
- lists:map(
- fun({K, V}) ->
- {list_to_binary(K), int_val(V, ?DEFAULT_SHARES)}
- end,
- config:get("replicator.shares")
- ).
-
-priority_coeff() ->
- % This is the K2 coefficient from [1]
- Default = ?DEFAULT_PRIORITY_COEFF,
- Val = float_val(config:get("replicator", "priority_coeff"), Default),
- bounded(Val, 0.0, 1.0).
-
-usage_coeff() ->
- % This is the K1 coefficient from [1]
- Default = ?DEFAULT_USAGE_COEFF,
- Val = float_val(config:get("replicator", "usage_coeff"), Default),
- bounded(Val, 0.0, 1.0).
-
-int_val(Str, Default) when is_list(Str) ->
- try list_to_integer(Str) of
- Val -> Val
- catch
- error:badarg ->
- Default
- end.
-
-float_val(undefined, Default) ->
- Default;
-float_val(Str, Default) when is_list(Str) ->
- try list_to_float(Str) of
- Val -> Val
- catch
- error:badarg ->
- Default
- end.
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch_replicator/test/eunit/couch_replicator_test.hrl").
-
--define(DB1, <<"db1">>).
--define(DB2, <<"db2">>).
--define(DB3, <<"db3">>).
--define(J1, <<"j1">>).
--define(J2, <<"j2">>).
--define(J3, <<"j3">>).
-
-fair_share_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- ?TDEF_FE(init_works),
- ?TDEF_FE(shares_are_updated_and_reset),
- ?TDEF_FE(jobs_are_added_and_removed),
- ?TDEF_FE(can_fetch_job_priority),
- ?TDEF_FE(jobs_are_charged),
- ?TDEF_FE(usage_is_updated),
- ?TDEF_FE(priority_coefficient_works),
- ?TDEF_FE(priority_decays_when_jobs_stop_running),
- ?TDEF_FE(priority_increases_when_jobs_run),
- ?TDEF_FE(two_dbs_equal_shares_equal_number_of_jobs),
- ?TDEF_FE(two_dbs_unequal_shares_equal_number_of_jobs),
- ?TDEF_FE(two_dbs_equal_shares_unequal_number_of_jobs),
- ?TDEF_FE(two_dbs_unequal_shares_unequal_number_of_jobs),
- ?TDEF_FE(three_dbs_equal_shares_equal_number_of_jobs),
- ?TDEF_FE(three_dbs_unequal_shares_equal_number_of_jobs),
- ?TDEF_FE(three_dbs_equal_shares_unequal_number_of_jobs),
- ?TDEF_FE(three_dbs_unequal_shares_unequal_number_of_jobs)
- ]
- }
- }.
-
-setup_all() ->
- test_util:start_couch().
-
-teardown_all(Ctx) ->
- config_delete("priority_coeff"),
- config_delete("usage_coeff"),
- config_shares_delete(),
- test_util:stop_couch(Ctx).
-
-setup() ->
- init(),
- ok.
-
-teardown(_) ->
- clear(),
- config_delete("priority_coeff"),
- config_delete("usage_coeff"),
- config_shares_delete().
-
-init_works(_) ->
- Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
- [?assert(is_list(ets:info(T))) || T <- Tables],
- ?assertEqual(#{}, tab2map(?SHARES)),
-
- clear(),
- [?assertEqual(undefined, ets:info(T)) || T <- Tables],
-
- config_share_set("db1", "200"),
- init(),
- ?assertEqual(200, shares(?DB1)),
- ?assertEqual(#{?DB1 => 200}, tab2map(?SHARES)).
-
-shares_are_updated_and_reset(_) ->
- ?assertEqual(#{}, tab2map(?SHARES)),
-
- update_shares(?DB1, 42),
- ?assertEqual(42, shares(?DB1)),
-
- reset_shares(?DB1),
- ?assertEqual(100, shares(?DB1)),
- ?assertEqual(#{}, tab2map(?SHARES)),
-
- % min shares
- update_shares(?DB1, 0),
- ?assertEqual(1, shares(?DB1)),
-
- % max shares
- update_shares(?DB1, 1001),
- ?assertEqual(1000, shares(?DB1)).
-
-jobs_are_added_and_removed(_) ->
- job_added(job(?J1, ?DB1)),
- ?assertEqual(1, num_jobs(?DB1)),
- ?assertEqual(#{?J1 => 0}, tab2map(?PRIORITIES)),
-
- job_added(job(?J2, ?DB1)),
- ?assertEqual(2, num_jobs(?DB1)),
- ?assertEqual(#{?J1 => 0, ?J2 => 0}, tab2map(?PRIORITIES)),
-
- job_added(job(?J3, ?DB2)),
- ?assertEqual(1, num_jobs(?DB2)),
- ?assertEqual(#{?J1 => 0, ?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)),
-
- job_removed(job(?J1, ?DB1)),
- ?assertEqual(1, num_jobs(?DB1)),
- ?assertEqual(#{?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)),
-
- job_removed(job(?J3, ?DB2)),
- ?assertEqual(0, num_jobs(?DB2)),
- ?assertEqual(0, priority(?J3)),
-
- job_removed(job(?J2, ?DB1)),
- ?assertEqual(0, num_jobs(?DB2)),
- ?assertEqual(#{}, tab2map(?NUM_JOBS)),
- ?assertEqual(0, priority(?J2)),
- ?assertEqual(#{}, tab2map(?PRIORITIES)).
-
-can_fetch_job_priority(_) ->
- job_added(job(?J1, ?DB1)),
- ?assertEqual(0, priority(?J1)),
-
- ets:insert(?PRIORITIES, {?J1, 42}),
- ?assertEqual(42, priority(?J1)),
-
- ets:delete(?PRIORITIES, ?J1),
- ?assertEqual(0, priority(?J1)).
-
-jobs_are_charged(_) ->
- Job1 = running_job(?J1, ?DB1),
- job_added(Job1),
- ?assertEqual(#{}, tab2map(?CHARGES)),
-
- charge(Job1, 1000, {0, 1, 0}),
- ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)),
-
- % Stopped jobs are not charged
- charge(stop(Job1), 1000, {0, 1, 0}),
- ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)),
-
- % Only charge up to one interval's worth even if job ran longer
- charge(Job1, 1000, {0, 5, 0}),
- ?assertEqual(#{?DB1 => 2000000}, tab2map(?CHARGES)),
-
- % Charges are accumulated from jobs in same db
- Job2 = running_job(?J2, ?DB1),
- job_added(Job2),
- charge(Job2, 1000, {0, 0, 1}),
- ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)),
-
- % Charges are not cleared if jobs are removed
- job_removed(Job1),
- job_removed(Job2),
- ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)).
-
-usage_is_updated(_) ->
- Job = running_job(?J1, ?DB1),
- job_added(Job),
-
- charge(Job, 60000, {0, 60, 0}),
- update_usage(),
- ?assertEqual(60000000, usage(?DB1)),
-
- % Charges table is cleared after usage is updated
- ?assertEqual(#{}, tab2map(?CHARGES)),
-
- % Check that usage decay works
- config_set("usage_coeff", "0.2"),
- update_usage(),
- ?assertEqual(12000000, usage(?DB1)),
-
- config_set("usage_coeff", "0.5"),
- update_usage(),
- ?assertEqual(6000000, usage(?DB1)),
-
- % Check that function both decays and updates from charges
- charge(Job, 60000, {0, 60, 0}),
- update_usage(),
- ?assertEqual(63000000, usage(?DB1)),
-
- % Usage eventually decays to 0 and is removed from the table
- [update_usage() || _ <- lists:seq(1, 100)],
- ?assertEqual(0, usage(?DB1)),
- ?assertEqual(#{}, tab2map(?USAGE)).
-
-priority_coefficient_works(_) ->
- job_added(job(?J1, ?DB1)),
- ets:insert(?PRIORITIES, {?J1, 1000}),
-
- config_set("priority_coeff", "0.8"),
- decay_priorities(),
- ?assertEqual(800, priority(?J1)),
-
- config_set("priority_coeff", "0.5"),
- decay_priorities(),
- ?assertEqual(400, priority(?J1)),
-
- % If non-float junk value is set then the default is used
- config_set("priority_coeff", "junk"),
- decay_priorities(),
- ?assertEqual(392, priority(?J1)),
-
- % Clipped to 1.0 max
- config_set("priority_coeff", "1.1"),
- decay_priorities(),
- ?assertEqual(392, priority(?J1)),
-
- % Clipped to 0.0 min and removed when =< 0
- config_set("priority_coeff", "-0.1"),
- decay_priorities(),
- ?assertEqual(0, priority(?J1)),
- ?assertEqual(#{}, tab2map(?PRIORITIES)).
-
-priority_decays_when_jobs_stop_running(_) ->
- Job = running_job(?J1, ?DB1),
- job_added(Job),
-
- % Ran for one cycle then stop
- {[], Pending} = reschedule(1, {[Job], []}),
-
- % Priority is non-0 initially
- ?assert(priority(?J1) > 0),
-
- % Priority decays to 0 after some cycles
- [reschedule(0, {[], Pending}) || _ <- lists:seq(1, 500)],
- ?assertEqual(0, priority(?J1)).
-
-priority_increases_when_jobs_run(_) ->
- Job = running_job(?J1, ?DB1),
- job_added(Job),
-
- Running = [Job],
- reschedule(0, {Running, []}),
- P1 = priority(?J1),
- ?assert(P1 > 0),
-
- % Priority increases
- reschedule(0, {Running, []}),
- P2 = priority(?J1),
- ?assert(P2 > P1),
-
- % Additive priority increase is balanced out by priority decay
- [reschedule(0, {Running, []}) || _ <- lists:seq(1, 500)],
- Pn = priority(?J1),
- ?assert(Pn > P2),
-
- reschedule(0, {Running, []}),
- Pm = priority(?J1),
- ?assertEqual(Pn, Pm).
-
-two_dbs_equal_shares_equal_number_of_jobs(_) ->
- update_shares(?DB1, 100),
- update_shares(?DB2, 100),
- Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}),
- #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
- ?assert(49 =< Db1 andalso Db1 =< 51),
- ?assert(49 =< Db2 andalso Db2 =< 51).
-
-two_dbs_unequal_shares_equal_number_of_jobs(_) ->
- update_shares(?DB1, 100),
- update_shares(?DB1, 900),
- Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}),
- #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
- ?assert(89 =< Db1 andalso Db1 =< 91),
- ?assert(9 =< Db2 andalso Db2 =< 11).
-
-two_dbs_equal_shares_unequal_number_of_jobs(_) ->
- update_shares(?DB1, 100),
- update_shares(?DB2, 100),
- Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}),
- #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
- ?assert(49 =< Db1 andalso Db1 =< 51),
- ?assert(49 =< Db2 andalso Db2 =< 51).
-
-two_dbs_unequal_shares_unequal_number_of_jobs(_) ->
- update_shares(?DB1, 1),
- update_shares(?DB2, 100),
- Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}),
- #{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
- ?assert(0 =< Db1 andalso Db1 =< 2),
- ?assert(98 =< Db2 andalso Db2 =< 100).
-
-three_dbs_equal_shares_equal_number_of_jobs(_) ->
- update_shares(?DB1, 100),
- update_shares(?DB2, 100),
- update_shares(?DB3, 100),
- Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}),
- #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
- ?assert(32 =< Db1 andalso Db1 =< 34),
- ?assert(32 =< Db2 andalso Db2 =< 34),
- ?assert(32 =< Db3 andalso Db3 =< 34).
-
-three_dbs_unequal_shares_equal_number_of_jobs(_) ->
- update_shares(?DB1, 100),
- update_shares(?DB2, 700),
- update_shares(?DB3, 200),
- Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}),
- #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
- ?assert(9 =< Db1 andalso Db1 =< 11),
- ?assert(69 =< Db2 andalso Db2 =< 71),
- ?assert(19 =< Db3 andalso Db3 =< 21).
-
-three_dbs_equal_shares_unequal_number_of_jobs(_) ->
- update_shares(?DB1, 100),
- update_shares(?DB2, 100),
- update_shares(?DB3, 100),
- Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 100}, ?DB3 => {25, 75}}),
- #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
- ?assert(32 =< Db1 andalso Db1 =< 34),
- ?assert(32 =< Db2 andalso Db2 =< 34),
- ?assert(32 =< Db3 andalso Db3 =< 34).
-
-three_dbs_unequal_shares_unequal_number_of_jobs(_) ->
- update_shares(?DB1, 1000),
- update_shares(?DB2, 100),
- update_shares(?DB3, 1),
- Jobs = jobs(#{?DB1 => {25, 100}, ?DB2 => {25, 125}, ?DB3 => {25, 875}}),
- #{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
- ?assert(87 =< Db1 andalso Db1 =< 89),
- ?assert(9 =< Db2 andalso Db2 =< 11),
- ?assert(2 =< Db3 andalso Db3 =< 4).
-
-config_set(K, V) ->
- config:set("replicator", K, V, _Persist = false).
-
-config_delete(K) ->
- config:delete("replicator", K, _Persist = false).
-
-config_share_set(K, V) ->
- config:set("replicator.shares", K, V, _Persist = false).
-
-config_shares_delete() ->
- [
- config:delete("replicator.shares", K, _Persist = false)
- || {K, _} <- config:get("replicator.shares")
- ].
-
-tab2map(T) when is_atom(T) ->
- maps:from_list(ets:tab2list(T)).
-
-job(rand, Db) ->
- job(rand:uniform(1 bsl 59), Db);
-job(Id, Db) ->
- Job = #job{
- id = Id,
- rep = #rep{
- db_name = Db,
- user_ctx = #user_ctx{}
- }
- },
- stop(Job).
-
-running_job(Id, Db) ->
- run(job(Id, Db)).
-
-run(#job{} = Job) ->
- Job#job{
- pid = list_to_pid("<0.9999.999>"),
- history = [{started, {0, 0, 0}}, {added, {0, 0, 0}}]
- }.
-
-stop(#job{} = Job) ->
- Job#job{
- pid = undefined,
- history = [{added, {0, 0, 0}}]
- }.
-
-% Simple scheduler simulator. Start and stop N jobs and do the
-% accounting steps. Return a new list of running and pending jobs. If
-% N is 0 then jobs which were running stay running and jobs were
-% pending stay pending.
-%
-reschedule(N, {Running, Pending}) ->
- update(Running, 60000, {0, 60, 0}),
-
- RunPr = [{priority(Job#job.id), Job} || Job <- Running],
- StopPr = [{priority(Job#job.id), Job} || Job <- Pending],
-
- {_, Running1} = lists:unzip(lists:reverse(lists:sort(RunPr))),
- {_, Pending1} = lists:unzip(lists:sort(StopPr)),
-
- ToStop = lists:sublist(Running1, N),
- ToStart = lists:sublist(Pending1, N),
-
- Running2 = [run(Job) || Job <- ToStart] ++ Running1 -- ToStop,
- Pending2 = [stop(Job) || Job <- ToStop] ++ Pending1 -- ToStart,
-
- {Running2, Pending2}.
-
-% Run a few scheduling cycles and calculate usage percentage for each db
-%
-run_scheduler(Cycles, Churn, Jobs0) ->
- Acc0 = {#{}, Jobs0},
-
- {Sum, _} = lists:foldl(
- fun(_CycleCnt, {UsageAcc, {Running, _} = Jobs}) ->
- UsageAcc1 = lists:foldl(
- fun(#job{} = Job, Acc) ->
- Db = Job#job.rep#rep.db_name,
- maps:update_with(Db, fun(V) -> V + 1 end, 0, Acc)
- end,
- UsageAcc,
- Running
- ),
- {UsageAcc1, reschedule(Churn, Jobs)}
- end,
- Acc0,
- lists:seq(1, Cycles)
- ),
-
- Total = maps:fold(fun(_, V, Acc) -> Acc + V end, 0, Sum),
- maps:map(fun(_Db, V) -> round(V / Total * 100) end, Sum).
-
-% Dbs = #{Db => {RunningCount, PendingCount}
-%
-jobs(#{} = Dbs) ->
- maps:fold(
- fun(Db, {RCnt, PCnt}, {Running, Pending}) ->
- RJobs = [running_job(rand, Db) || _ <- lists:seq(1, RCnt)],
- PJobs = [job(rand, Db) || _ <- lists:seq(1, PCnt)],
- [job_added(Job) || Job <- RJobs ++ PJobs],
- {Running ++ RJobs, Pending ++ PJobs}
- end,
- {[], []},
- Dbs
- ).
-
--endif.