AIMD based rate limiter implementation
AIMD: additive increase / multiplicative decrease feedback control algorithm. This is an algorithm which converges on the available channel capacity. Each participant doesn't a priori know the capacity and participants don't communicate or know about each other (so they don't coordinate to divide the capacity among themselves). A variation of this is used in TCP congestion control algorithm. This is proven to converge, while for example, additive increase / additive decrease or multiplicative increase / multiplicative decrease won't. A few tweaks were applied to the base control logic: * Estimated value is an interval (period) instead of a rate. This is for convenience, as users will probably want to know how much to sleep. But, rate is just 1000 / interval, so it is easy to transform. * There is a hard max limit for estimated period. Mainly as a practical concern as connections sleeping too long will timeout and / or jobs will waste time sleeping and consume scheduler slots, while others could be running. * There is a time decay component used to handle large pauses between updates. In case of large update interval, assume (optimistically) some successful requests have been made. Intuitively, the more time passes, the less accurate the estimated period probably is. * The rate of updates applied to the algorithm is limited. This effectively acts as a low pass filter and make the algorithm handle better spikes and short bursts of failures. This is not a novel idea, some alternative TCP control algorithms like Westwood+ do something similar. * There is a large downward pressure applied to the increasing interval as it approaches the max limit. This is done by tweaking the additive factor via a step function. In practice this has effect of trying to make it a bit harder for jobs to cross the maximum backoff threshold, as they would be killed and potentially lose intermediate work. Main API functions are: success(Key) -> IntervalInMilliseconds failure(Key) -> IntervalInMilliseconds interval(Key) -> IntervalInMilliseconds Key is any (hashable by phash2) term. Typically would be something like {Method, Url}. The result from the function is the current period value. Caller would then presumably choose to sleep for that amount of time before or after making requests. The current interval can be read with interval(Key) function. Implementation is sharded ETS tables based on the key and there is a periodic timer which cleans unused items. Jira: COUCHDB-3324
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+% This module implements rate limiting based on a variation the additive
+% increase / multiplicative decrease feedback control algorithm.
+% This is an adaptive algorithm which converges on available channel
+% capacity where each participant (client) doesn't a priori know the
+% capacity, and participants don't communicate or know about each other (so they
+% don't coordinate to divide the capacity among themselves).
+% The algorithm referenced above estimates a rate, whereas the implemented
+% algorithm uses an interval (in milliseconds). It preserves the original
+% semantics, that is the failure part is multplicative and the success part is
+% additive. The relationship between rate and interval is: rate = 1000 /
+% interval.
+% There are two main API functions:
+% success(Key) -> IntervalInMilliseconds
+% failure(Key) -> IntervalInMilliseconds
+% Key is any term, typically something like {Method, Url}. The result from the
+% function is the current period value. Caller then might decide to sleep for
+% that amount of time before or after each request.
+ start_link/0
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3
+ interval/1,
+ max_interval/0,
+ failure/1,
+ success/1
+% Types
+-type key() :: any().
+-type interval() :: non_neg_integer().
+-type msec() :: non_neg_integer().
+% Definitions
+% Main parameters of the algorithm. The factor is the multiplicative part and
+% base interval is the additive.
+-define(BASE_INTERVAL, 20).
+-define(BACKOFF_FACTOR, 1.2).
+% If estimated period exceeds a limit, it is clipped to this value. This
+% defines a practical limit of this algorithm. This is driven by real world
+% concerns such as having a connection which sleeps for too long and ends up
+% with socket timeout errors, or replication jobs which occupy a scheduler
+% slot without making any progress.
+-define(MAX_INTERVAL, 25000).
+% Specify when (threshold) and how much (factor) to decay the estimated period.
+% If there is a long pause between consecutive updates, the estimated period
+% would become less accurate as more time passes. In such case choose to
+% optimistically decay the estimated value. That is assume there a certain
+% rate of successful requests happened. (For reference, TCP congestion algorithm
+% also handles a variation of this in RFC 5681 under "Restarting Idle
+% Connections" section).
+-define(TIME_DECAY_FACTOR, 2).
+-define(TIME_DECAY_THRESHOLD, 1000).
+% Limit the rate of updates applied. This controls the rate of change of the
+% estimated value. In colloquial terms it defines how "twitchy" the algorithm
+% is. Or, another way to look at it, this is as a poor version of a low pass
+% filter. (Some alternative TCP congestion control algorithms, like Westwood+
+% use something similar to solve the ACK compression problem).
+-record(state, {timer}).
+-record(rec, {id, backoff, ts}).
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+-spec interval(key()) -> interval().
+interval(Key) ->
+ {Interval, _Timestamp} = interval_and_timestamp(Key),
+ Interval.
+-spec max_interval() -> interval().
+max_interval() ->
+-spec failure(key()) -> interval().
+failure(Key) ->
+ {Interval, Timestamp} = interval_and_timestamp(Key),
+ update_failure(Key, Interval, Timestamp, now_msec()).
+-spec success(key()) -> interval().
+success(Key) ->
+ {Interval, Timestamp} = interval_and_timestamp(Key),
+ update_success(Key, Interval, Timestamp, now_msec()).
+% gen_server callbacks
+init([]) ->
+ couch_replicator_rate_limiter_tables:create(,
+ {ok, #state{timer = new_timer()}}.
+terminate(_Reason, _State) ->
+ ok.
+handle_call(_Msg, _From, State) ->
+ {reply, invalid, State}.
+handle_cast(_, State) ->
+ {noreply, State}.
+handle_info(cleanup, #state{timer = Timer}) ->
+ erlang:cancel_timer(Timer),
+ TIds = couch_replicator_rate_limiter_tables:tids(),
+ [cleanup_table(TId, now_msec() - ?MAX_INTERVAL) || TId <- TIds],
+ {noreply, #state{timer = new_timer()}}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+% Private functions
+-spec update_success(any(), interval(), msec(), msec()) -> interval().
+update_success(_Key, _Interval, _Timestamp = 0, _Now) ->
+ 0; % No ets entry. Keep it that way and don't insert a new one.
+update_success(_Key, Interval, Timestamp, Now)
+ when Now - Timestamp =< ?SENSITIVITY_TIME_WINDOW ->
+ Interval; % Ignore too frequent updates.
+update_success(Key, Interval, Timestamp, Now) ->
+ DecayedInterval = time_decay(Now - Timestamp, Interval),
+ AdditiveFactor = additive_factor(DecayedInterval),
+ NewInterval = DecayedInterval - AdditiveFactor,
+ if
+ NewInterval =< 0 ->
+ Table = couch_replicator_rate_limiter_tables:term_to_table(Key),
+ ets:delete(Table, Key),
+ 0;
+ NewInterval =< ?BASE_INTERVAL ->
+ insert(Key, ?BASE_INTERVAL, Now);
+ NewInterval > ?BASE_INTERVAL ->
+ insert(Key, NewInterval, Now)
+ end.
+-spec update_failure(any(), interval(), msec(), msec()) -> interval().
+update_failure(_Key, Interval, Timestamp, Now)
+ when Now - Timestamp =< ?SENSITIVITY_TIME_WINDOW ->
+ Interval; % Ignore too frequent updates.
+update_failure(Key, Interval, _Timestamp, Now) ->
+ Interval1 = erlang:max(Interval, ?BASE_INTERVAL),
+ Interval2 = round(Interval1 * ?BACKOFF_FACTOR),
+ Interval3 = erlang:min(Interval2, ?MAX_INTERVAL),
+ insert(Key, Interval3, Now).
+-spec insert(any(), interval(), msec()) -> interval().
+insert(Key, Interval, Timestamp) ->
+ Entry = #rec{id = Key, backoff = Interval, ts = Timestamp},
+ Table = couch_replicator_rate_limiter_tables:term_to_table(Key),
+ ets:insert(Table, Entry),
+ Interval.
+-spec interval_and_timestamp(key()) -> {interval(), msec()}.
+interval_and_timestamp(Key) ->
+ Table = couch_replicator_rate_limiter_tables:term_to_table(Key),
+ case ets:lookup(Table, Key) of
+ [] ->
+ {0, 0};
+ [#rec{backoff = Interval, ts = Timestamp}] ->
+ {Interval, Timestamp}
+ end.
+-spec time_decay(msec(), interval()) -> interval().
+time_decay(Dt, Interval) when Dt > ?TIME_DECAY_THRESHOLD ->
+ DecayedInterval = Interval - ?TIME_DECAY_FACTOR * Dt,
+ erlang:max(round(DecayedInterval), 0);
+time_decay(_Dt, Interval) ->
+ Interval.
+% Calculate additive factor. Ideally it would be a constant but in this case
+% it is a step function to help handle larger values as they are approaching
+% the backoff limit. Large success values closer to the limit add some
+% pressure against the limit, which is useful, as at the backoff limit the
+% whole replication job is killed which can be costly in time and temporary work
+% lost by those jobs.
+-spec additive_factor(interval()) -> interval().
+additive_factor(Interval) when Interval > 10000 ->
+additive_factor(Interval) when Interval > 1000 ->
+additive_factor(Interval) when Interval > 100 ->
+additive_factor(_Interval) ->
+-spec new_timer() -> reference().
+new_timer() ->
+ erlang:send_after(?MAX_INTERVAL * 2, self(), cleanup).
+-spec now_msec() -> msec().
+now_msec() ->
+ {Mega, Sec, Micro} = os:timestamp(),
+ ((Mega * 1000000) + Sec) * 1000 + Micro div 1000.
+-spec cleanup_table(atom(), msec()) -> non_neg_integer().
+cleanup_table(Tid, LimitMSec) ->
+ Head = #rec{ts = '$1', _ = '_'},
+ Guard = {'<', '$1', LimitMSec},
+ ets:select_delete(Tid, [{Head, [Guard], [true]}]).
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+% Maintain cluster membership and stability notifications for replications.
+% On changes to cluster membership, broadcast events to `replication` gen_event.
+% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
+% Cluster stability is defined as "there have been no nodes added or removed in
+% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
+% speedier startup, during initialization there is a shorter StartupQuietPeriod
+% in effect (also configurable).
+% This module is also in charge of calculating ownership of replications based
+% on where their _repicator db documents shards live.
+ create/1,
+ tids/0,
+ term_to_table/1
+-define(SHARDS_N, 16).
+-spec create(non_neg_integer()) -> ok.
+create(KeyPos) ->
+ Opts = [named_table, public, {keypos, KeyPos}, {read_concurrency, true}],
+ [ets:new(list_to_atom(TableName), Opts) || TableName <- table_names()],
+ ok.
+-spec tids() -> [atom()].
+tids() ->
+ [list_to_existing_atom(TableName) || TableName <- table_names()].
+-spec term_to_table(any()) -> atom().
+term_to_table(Term) ->
+ PHash = erlang:phash2(Term),
+ list_to_existing_atom(table_name(PHash rem ?SHARDS_N)).
+-spec table_names() -> [string()].
+table_names() ->
+ [table_name(N) || N <- lists:seq(0, ?SHARDS_N - 1)].
+-spec table_name(non_neg_integer()) -> string().
+table_name(Id) when is_integer(Id), Id >= 0 andalso Id < ?SHARDS_N ->
+ atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id).
+rate_limiter_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_new_key(),
+ t_1_failure(),
+ t_2_failures_back_to_back(),
+ t_2_failures(),
+ t_success_threshold(),
+ t_1_failure_2_successes()
+ ]
+ }.
+t_new_key() ->
+ ?_test(begin
+ ?assertEqual(0, couch_replicator_rate_limiter:interval({"foo", get}))
+ end).
+t_1_failure() ->
+ ?_test(begin
+ ?assertEqual(24, couch_replicator_rate_limiter:failure({"foo", get}))
+ end).
+t_2_failures() ->
+ ?_test(begin
+ couch_replicator_rate_limiter:failure({"foo", get}),
+ low_pass_filter_delay(),
+ Interval = couch_replicator_rate_limiter:failure({"foo", get}),
+ ?assertEqual(29, Interval)
+ end).
+t_2_failures_back_to_back() ->
+ ?_test(begin
+ couch_replicator_rate_limiter:failure({"foo", get}),
+ Interval = couch_replicator_rate_limiter:failure({"foo", get}),
+ ?assertEqual(24, Interval)
+ end).
+t_success_threshold() ->
+ ?_test(begin
+ Interval = couch_replicator_rate_limiter:success({"foo", get}),
+ ?assertEqual(0, Interval),
+ Interval = couch_replicator_rate_limiter:success({"foo", get}),
+ ?assertEqual(0, Interval)
+ end).
+t_1_failure_2_successes() ->
+ ?_test(begin
+ couch_replicator_rate_limiter:failure({"foo", get}),
+ low_pass_filter_delay(),
+ Succ1 = couch_replicator_rate_limiter:success({"foo", get}),
+ ?assertEqual(20, Succ1),
+ low_pass_filter_delay(),
+ Succ2 = couch_replicator_rate_limiter:success({"foo", get}),
+ ?assertEqual(0, Succ2)
+ end).
+low_pass_filter_delay() ->
+ timer:sleep(100).
+setup() ->
+ {ok, Pid} = couch_replicator_rate_limiter:start_link(),
+ Pid.
+teardown(Pid) ->
+ Ref = erlang:monitor(process, Pid),
+ unlink(Pid),
+ exit(Pid, kill),
+ receive
+ {'DOWN', Ref, process, Pid, _} ->
+ ok
+ end,
+ ok.