diff options
author | iilyak <iilyak@users.noreply.github.com> | 2020-04-02 03:43:35 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-02 03:43:35 -0700 |
commit | d520d7331706315968f918c0985e8341c62246e5 (patch) | |
tree | 823146246a5af92fdc3d3382d1ea7baea2f7386a | |
parent | 4707af418eefe2988a061c53e122c774c52c339a (diff) | |
parent | 85f81d88018fe526f1f216d673a9bbc847cbd81c (diff) | |
download | couchdb-d520d7331706315968f918c0985e8341c62246e5.tar.gz |
Merge pull request #2662 from cloudant/couch_view-rate_limit
Use `couch_rate` application for `couch_view`
26 files changed, 1839 insertions, 28 deletions
diff --git a/.credo.exs b/.credo.exs index bd26f407c..112561b95 100644 --- a/.credo.exs +++ b/.credo.exs @@ -37,6 +37,7 @@ ~r"/src/metrics", ~r"/src/minerl", ~r"/src/parse_trans", + ~r"/src/stream_data", ~r"/src/ssl_verify_fun", ~r"/test/elixir/deps/" ] diff --git a/.gitignore b/.gitignore index bf45d1a4f..955403a98 100644 --- a/.gitignore +++ b/.gitignore @@ -76,6 +76,7 @@ src/rebar/ src/recon/ src/smoosh/ src/snappy/ +src/stream_data/ src/ssl_verify_fun/ src/thrift_protocol/ src/triq/ @@ -162,6 +162,7 @@ endif check-fdb: make eunit apps=couch_eval,couch_expiring_cache,ctrace,couch_jobs,couch_views,fabric make elixir tests=test/elixir/test/basics_test.exs,test/elixir/test/replication_test.exs,test/elixir/test/map_test.exs,test/elixir/test/all_docs_test.exs,test/elixir/test/bulk_docs_test.exs + make exunit tests=src/couch_rate/test/exunit/ .PHONY: eunit # target: eunit - Run EUnit tests, use EUNIT_OPTS to provide custom options @@ -49,11 +49,14 @@ defmodule CouchDBTest.Mixfile do # Run "mix help compile.app" to learn about applications. def application do [ - extra_applications: [:logger], + extra_applications: extra_applications(Mix.env()), applications: [:httpotion] ] end + defp extra_applications(:test), do: [:logger, :stream_data] + defp extra_applications(_), do: [:logger] + # Specifies which paths to compile per environment. defp elixirc_paths(:test), do: ["test/elixir/lib", "test/elixir/test/support"] defp elixirc_paths(:integration), do: ["test/elixir/lib", "test/elixir/test/support"] @@ -68,7 +71,8 @@ defmodule CouchDBTest.Mixfile do {:jiffy, path: Path.expand("src/jiffy", __DIR__)}, {:ibrowse, path: Path.expand("src/ibrowse", __DIR__), override: true, compile: false}, - {:credo, "~> 1.2.0", only: [:dev, :test, :integration], runtime: false} + {:credo, "~> 1.2.0", only: [:dev, :test, :integration], runtime: false}, + {:stream_data, "~> 0.4.3", only: [:dev, :test, :integration], runtime: false} ] end @@ -14,5 +14,6 @@ "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"}, + "stream_data": {:hex, :stream_data, "0.4.3", "62aafd870caff0849a5057a7ec270fad0eb86889f4d433b937d996de99e3db25", [:mix], [], "hexpm", "7dafd5a801f0bc897f74fcd414651632b77ca367a7ae4568778191fc3bf3a19a"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"}, } diff --git a/rebar.config.script b/rebar.config.script index 269511d8e..6f9f65c73 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -123,6 +123,7 @@ SubDirs = [ "src/couch_index", "src/couch_mrview", "src/couch_js", + "src/couch_rate", "src/couch_replicator", "src/couch_plugins", "src/couch_pse_tests", diff --git a/rel/files/eunit.ini b/rel/files/eunit.ini index 2b73ab307..20277f288 100644 --- a/rel/files/eunit.ini +++ b/rel/files/eunit.ini @@ -40,3 +40,6 @@ startup_jitter = 0 [fabric] ; disable index auto-updater to avoid interfering with some of the tests index_updater_enabled = false + +[couch_rate.views] +opts = #{budget => 100, target => 500, window => 6000, sensitivity => 200, congested_delay => 1}
\ No newline at end of file diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 749cdd27f..fd0aa7763 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -699,3 +699,7 @@ compaction = false ; log every generated trace by including the following: ; ; all = (#{}) -> true + +[couch_rate.views] +limiter = couch_rate_limiter +opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000}
\ No newline at end of file diff --git a/src/couch_rate/README.md b/src/couch_rate/README.md new file mode 100644 index 000000000..530da1a99 --- /dev/null +++ b/src/couch_rate/README.md @@ -0,0 +1,155 @@ +# Description + +The `couch_rate` application implements a generic rate limiter which can +be used to control batch size and delay between batches. It was initially +designed for background index build to find an optimal batch size to utilize +the FDB transaction up to configured `target` parameter. The application +provides an API to plug custom rate limiting logic when need to. + +# Default rate limit logic + +The `couch_rate_limiter` is the rate limit module used by default. +The module tracks average number of reads and writes over specified +time period. It uses average read/write numbers to calculate an +approximate value for read/write ratio. Then the read/write ratio is +used to convert estimated amount of writes into batch size. + +# Configuration + +## API based usage + +In the simplest use case the only mandatory keys `new/3` expects are: +* `budget` - the initial value for estimated batch size +* `target` - the amount in msec which we try to maintain for batch processing time +* `window` - time interval for contention detector +* `sensitivity` - minimal interval within the `window` + +We choose sane default values for the rest of the parameters. + +* `window_size = window div sensitivity + 1` +* `underload_threshold = round(target * 0.95)` +* `overload_threshold = round(target * 1.05)` +* `delay_threshold = round(target * 1.07)` + +Due to the use of `round` in defaults calculation the `target` cannot be less +than `36` msec. Otherwise some of the thresholds become equal which breaks the +algorithm. + +In the case when you need to specify custom parameters, the following keys +are supported: + +* `window_size` - how many batches to consider in contention detector +* `timer` - this is used for testing to fast forward time `fun() -> current_time_in_ms() end` +* `target` - the amount in msec which we try to maintain for batch processing time +* `underload_threshold` - a threshold bellow which we would try to increase the budget +* `overload_threshold` - a threshold above which we would start decreasing the budget +* `delay_threshold` - a threshold above which we would start introducing delays between batches +* `multiplicative_factor` - determines how fast we are going to decrease budget (must be in (0..1) range) +* `regular_delay` - delay between batches when there is no overload +* `congested_delay` - delay between batches when there is an overload +* `initial_budget` - initial value for budget to start with + +## default.ini based usage + +The users of the `couch_rate` application pass the `ConfigId` parameter. +When calling `couch_rate:new` and `couch_rate:create_if_missing`. +The `couch_rate` application uses this information to construct name of the +configuration section to use to get configuration parameters. The configration +section is constructed using `"couch_rate." ++ ConfigId`. +The parameters are encoded using erlang map syntax. +Limitation of the map parser: + +* Keys must be atoms +* Values are either integers or floats +* We only support positive values in the map +* Configuration object cannot use erlang reserved words in keys: + `after`, `and`, `andalso`, `band`, `begin`, `bnot`, `bor`, + `bsl`, `bsr`, `bxor`, `case`, `catch`, `cond`, `div`, `end` + `fun`, `if`, `let`, `not`, `of`, `or`, `orelse`, `receive` + `rem`, `try`, `when`, `xor` + +The auxilary `couch_rate_config` module implements the following API: + +* `couch_rate_config:from_str/1` - parses a string representation of parameters +* `couch_rate_config:to_str/1` - converts parameters to string (used in testing) + +Here is the example of configuration used in `couch_view` application: + +``` +[couch_rate.views] +limiter = couch_rate_limiter +opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000} +``` + +In the `couch_view` application it is used as follows: + +``` +Limiter = couch_rate:create_if_missing({DbName, DDocId}, "views"), +``` + +# API + +The application implements two APIs. Both APIs are supported by `couch_rate` +module. The API variants are: + +* explicit state passing +* state store based approach + +The API is chosen baed on the `StoreModule` argument passed to `new/4`. +Currently we support following values for `StoreModule`: + +* `nil` - this value indicates that explicit state passing would be used +* `couch_rate_ets` - ets based global state store (ets tables are owned by app supervisor) +* `couch_rate_pd` - process dicionary based local state store + +The "explicit state passing" style returns a tuple `{Result :: term(), state()}`. +The result is the same as for state store based API. + + +## State store based APIs of `couch_rate` module. + +All functions can return `{error, Reason :: term()}` in case of errors. +This detail is ommited bellow. + +* `create_if_missing(Id :: id(), Module :: module(), Store :: module(), Options :: map()) -> limiter()` - create new rate limiter instance +* `new(Id :: id(), Module :: module(), Store :: module(), Options :: map()) -> limiter()` - create new rate limiter instance +* `budget(limiter()) -> Budget :: integer().` - get batch size +* `delay(limiter()) -> Delay :: timeout().` - return delay in msec between batches +* `wait(limiter()) -> ok` - block the caller for amount of time returned by `delay/1` +* `in(limiter(), Reads :: integer()) -> limiter()` - notify rate limiter on the amount of reads were actually done (could be less than `budget`) +* `success(limiter(), Writes :: integer()) -> limiter()` - how many writes happen +* `failure(limiter()) -> limiter()` - called instead of `success/2` when failure happen +* `is_congestion(limiter()) -> boolean()` - returns `false` when congestion is detected +* `format(limiter()) -> [{Key :: atom(), Value :: term()}]` - return key value list representing important aspects of the limiter state +* `id(limitter()) -> id()` - returns `id()` of the rate limiter +* `module(limiter()) -> module()` - returns callback module implementing rate limiting logic. +* `state(limiter()) -> state()` - returns internal state of rate limiter. +* `store(limiter()) -> module() | nil` - returns store state backend. + +# Testing + +The test suite is written in Elixir. + +## Running all tests + +``` +make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/ +``` + +## Running specific test suite + +``` +make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs +``` + +## Running specific test using line number + +``` +make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs:10 +``` + +## Running traces with stats output + +``` +make couch && ERL_LIBS=`pwd`/src EXUNIT_DEBUG=true mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs +```
\ No newline at end of file diff --git a/src/couch_rate/src/couch_rate.app.src b/src/couch_rate/src/couch_rate.app.src new file mode 100644 index 000000000..ed6de81d6 --- /dev/null +++ b/src/couch_rate/src/couch_rate.app.src @@ -0,0 +1,24 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + {application, couch_rate, [ + {description, "Simple rate limiter"}, + {vsn, git}, + {registered, [ + ]}, + {applications, [ + kernel, + stdlib, + syntax_tools + ]}, + {mod, {couch_rate_app, []}} +]}. diff --git a/src/couch_rate/src/couch_rate.erl b/src/couch_rate/src/couch_rate.erl new file mode 100644 index 000000000..24bbcc2a5 --- /dev/null +++ b/src/couch_rate/src/couch_rate.erl @@ -0,0 +1,318 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rate). + +-include("couch_rate.hrl"). + +-export([ + create_if_missing/2, + create_if_missing/3, + create_if_missing/4, + new/2, + new/3, + new/4, + from_map/4, + budget/1, + delay/1, + wait/1, + in/2, + success/2, + failure/1, + is_congestion/1, + min_latency/1, + format/1, + to_map/1, + id/1, + module/1, + state/1, + store/1 +]). + +-define(LIMITER, ?MODULE). + +-type id() :: term(). +-type state() :: term(). +-type store() :: module(). + +-opaque limiter() :: #?LIMITER{}. + +-export_type([ + id/0, + state/0, + limiter/0 +]). + +-spec create_if_missing(id(), string()) -> + couch_rate:limiter() | {error, Reason :: term()}. + +create_if_missing(Id, ConfigId) -> + ?MODULE:create_if_missing(Id, ConfigId, couch_rate_ets). + +-spec create_if_missing(id(), string(), nil | module()) -> + couch_rate:limiter() | {error, Reason :: term()}. + +create_if_missing(Id, ConfigId, StateStore) -> + {Module, Options} = get_config(ConfigId), + ?MODULE:create_if_missing(Id, Module, StateStore, Options). + +-spec create_if_missing(id(), module(), nil | module(), map()) -> + couch_rate:limiter() | {error, Reason :: term()}. + +create_if_missing(Id, Module, nil, Options) -> + #?LIMITER{ + id = Id, + module = Module, + store = nil, + state = Module:new(Id, Options) + }; + +create_if_missing(Id, Module, Store, Options) -> + case Store:create_if_missing(Id, Module:new(Id, Options)) of + {error, _} = Error -> + Error; + State -> + #?LIMITER{ + id = Id, + module = Module, + store = Store, + state = State + } + end. + + +-spec new(id(), string()) -> + couch_rate:limiter() | {error, Reason :: term()}. + +new(Id, ConfigId) -> + ?MODULE:new(Id, ConfigId, couch_rate_ets). + +-spec new(id(), string(), module()) -> + couch_rate:limiter() | {error, Reason :: term()}. + +new(Id, ConfigId, StateStore) -> + {Module, Options} = get_config(ConfigId), + ?MODULE:new(Id, Module, StateStore, Options). + + +-spec new(id(), module(), nil | module(), map()) -> + couch_rate:limiter() | {error, Reason :: term()}. + +new(Id, Module, nil, Options) -> + #?LIMITER{ + id = Id, + module = Module, + store = nil, + state = Module:new(Id, Options) + }; + +new(Id, Module, Store, Options) -> + case Store:new(Id, Module:new(Id, Options)) of + {error, _} = Error -> + Error; + State -> + #?LIMITER{ + id = Id, + module = Module, + store = Store, + state = State + } + end. + + +-spec from_map(id(), module(), store(), map()) -> + couch_rate:limiter() + | {error, Reason :: term()}. + +from_map(Id, Module, nil, Map) -> + #?LIMITER{ + id = Id, + module = Module, + store = nil, + state = Module:from_map(Map) + }; + +from_map(Id, Module, Store, Map) -> + case Store:new(Id, Module:from_map(Map)) of + {error, _} = Error -> + Error; + State -> + #?LIMITER{ + id = Id, + module = Module, + store = Store, + state = State + } + end. + + +-spec update(limiter(), ( + fun( + (id(), state()) -> + {Result :: term(), state()} + | {error, Reason :: term()} + ) + )) -> + Result :: term() + | {Result :: term(), state()} + | {error, Reason :: term()}. + +update(#?LIMITER{store = nil, id = Id, state = State0} = Limiter, Fun) -> + case Fun(Id, State0) of + {error, _Reason} = Error -> + Error; + {Result, State1} -> + {Result, Limiter#?LIMITER{state = State1}} + end; + +update(#?LIMITER{id = Id, store = Store, state = State}, Fun) -> + Store:update(Id, State, Fun). + + +-spec budget(limiter()) -> + Budget :: integer() + | {Budget :: integer(), limiter()} + | {error, term()}. + +budget(#?LIMITER{module = Module} = Limiter) -> + update(Limiter, fun(Id, StateIn) -> + Module:budget(Id, StateIn) + end). + + +-spec delay(limiter()) -> + DelayTime :: integer() + | {DelayTime :: integer(), limiter()} + | {error, term()}. + +delay(#?LIMITER{module = Module} = Limiter) -> + update(Limiter, fun(Id, State) -> + Module:delay(Id, State) + end). + + +-spec wait(limiter()) -> + ok + | {ok, limiter()} + | {error, term()}. + +wait(#?LIMITER{module = Module} = Limiter) -> + update(Limiter, fun(Id, State) -> + Module:wait(Id, State) + end). + + +-spec in(limiter(), integer()) -> + ok + | {ok, limiter()} + | {error, term()}. + +in(#?LIMITER{module = Module} = Limiter, Reads) -> + update(Limiter, fun(Id, State) -> + Module:in(Id, State, Reads) + end). + + +-spec success(limiter(), integer()) -> + ok + | limiter() + | {error, term()}. + +success(#?LIMITER{module = Module} = Limiter, Writes) -> + update(Limiter, fun(Id, State) -> + Module:success(Id, State, Writes) + end). + + +-spec failure(limiter()) -> + ok + | limiter() + | {error, term()}. + +failure(#?LIMITER{module = Module} = Limiter) -> + update(Limiter, fun(Id, State) -> + Module:failure(Id, State) + end). + + +-spec is_congestion(limiter()) -> boolean(). + +is_congestion(#?LIMITER{store = nil, module = Module, id = Id, state = State}) -> + Module:is_congestion(Id, State); + +is_congestion(#?LIMITER{store = Store, module = Module, id = Id, state = State}) -> + Module:is_congestion(Id, Store:lookup(Id, State)). + + +-spec format(limiter()) -> [{Key :: atom(), Value :: term()}]. + +format(#?LIMITER{store = nil, module = Module, id = Id, state = State}) -> + Module:format(Id, State); + +format(#?LIMITER{store = Store, module = Module, id = Id, state = State}) -> + Module:format(Id, Store:lookup(Id, State)). + + +-spec to_map(limiter()) -> map(). + +to_map(#?LIMITER{store = nil, module = Module, id = Id, state = State}) -> + Module:to_map(Id, State); + +to_map(#?LIMITER{store = Store, module = Module, id = Id, state = State}) -> + Module:to_map(Id, Store:lookup(Id, State)). + +-spec min_latency(limiter()) -> pos_integer(). + +min_latency(#?LIMITER{store = nil, module = Module, id = Id, state = State}) -> + Module:min_latency(Id, State); + +min_latency(#?LIMITER{store = Store, module = Module, id = Id, state = State}) -> + Module:to_map(Id, Store:lookup(Id, State)). + + +-spec id(limiter()) -> module(). + +id(Limiter) -> + Limiter#?LIMITER.id. + + +-spec module(limiter()) -> module(). + +module(Limiter) -> + Limiter#?LIMITER.module. + + +-spec state(limiter()) -> state(). + +state(Limiter) -> + Limiter#?LIMITER.state. + +-spec store(limiter()) -> module() | nil. + +store(Limiter) -> + Limiter#?LIMITER.store. + + +get_config(ConfigId) -> + ConfigSection = "couch_rate." ++ ConfigId, + ModuleStr = config:get(ConfigSection, "limiter", "couch_rate_limiter"), + Module = list_to_existing_atom(ModuleStr), + case config:get(ConfigSection, "opts", undefined) of + undefined -> + {error, #{missing_key => "opts", in => ConfigSection}}; + OptionsStr -> + Options = couch_rate_config:from_str(OptionsStr), + lists:map(fun(Key) -> + maps:is_key(Key, Options) orelse error(#{missing_key => Key, in => Options}) + end, [budget, target, window, sensitivity]), + {Module, Options} + end. diff --git a/src/couch_rate/src/couch_rate.hrl b/src/couch_rate/src/couch_rate.hrl new file mode 100644 index 000000000..d19f7d8e4 --- /dev/null +++ b/src/couch_rate/src/couch_rate.hrl @@ -0,0 +1,19 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-record(couch_rate, + { + id :: couch_rate:id(), + module = couch_rate_limiter :: module(), + store = couch_rate_ets :: module() | nil, + state :: couch_rate:state() + }). diff --git a/src/couch_rate/src/couch_rate_app.erl b/src/couch_rate/src/couch_rate_app.erl new file mode 100644 index 000000000..2bb1621c3 --- /dev/null +++ b/src/couch_rate/src/couch_rate_app.erl @@ -0,0 +1,28 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rate_app). + +-behaviour(application). + +-export([ + start/2, + stop/1 +]). + + +start(_StartType, _StartArgs) -> + couch_rate_sup:start_link(). + + +stop(_State) -> + ok. diff --git a/src/couch_rate/src/couch_rate_config.erl b/src/couch_rate/src/couch_rate_config.erl new file mode 100644 index 000000000..709fbc3d3 --- /dev/null +++ b/src/couch_rate/src/couch_rate_config.erl @@ -0,0 +1,66 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rate_config). + +% This parser supports only maps where key is atom and value +% is positive float or positive integer. + +-include_lib("syntax_tools/include/merl.hrl"). + +-export([ + from_str/1, + to_str/1 +]). + +from_str(String) -> + parse_map(merl:quote(String)). + + +to_str(Map) when is_map(Map) -> + StringArgs = maps:fold(fun(Key, Val, Acc) -> + Acc ++ [atom_to_list(Key) ++ " => " ++ number_to_list(Val)] + end, [], Map), + "#{" ++ string:join(StringArgs, ", ") ++ "}". + + +number_to_list(Int) when is_integer(Int) -> + integer_to_list(Int); + +number_to_list(Float) when is_float(Float) -> + float_to_list(Float). + + +parse_map(MapAST) -> + erl_syntax:type(MapAST) == map_expr + orelse fail("Only #{field => pos_integer() | float()} syntax is supported"), + %% Parsing map manually, since merl does not support maps + lists:foldl(fun(AST, Bindings) -> + NameAST = erl_syntax:map_field_assoc_name(AST), + erl_syntax:type(NameAST) == atom + orelse fail("Only atoms are supported as field names"), + Name = erl_syntax:atom_value(NameAST), + ValueAST = erl_syntax:map_field_assoc_value(AST), + Value = case erl_syntax:type(ValueAST) of + integer -> + erl_syntax:integer_value(ValueAST); + float -> + erl_syntax:float_value(ValueAST); + _ -> + fail("Only pos_integer() or float() alowed as values") + end, + Bindings#{Name => Value} + end, #{}, erl_syntax:map_expr_fields(MapAST)). + + +fail(Msg) -> + throw({error, Msg}).
\ No newline at end of file diff --git a/src/couch_rate/src/couch_rate_ets.erl b/src/couch_rate/src/couch_rate_ets.erl new file mode 100644 index 000000000..edd9d965c --- /dev/null +++ b/src/couch_rate/src/couch_rate_ets.erl @@ -0,0 +1,119 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rate_ets). + +-include("couch_rate.hrl"). + +-export([ + create_tables/0, + delete_tables/0, + create_if_missing/2, + new/2, + lookup/2, + update/3 +]). + + +-define(SHARDS_N, 16). + +-type id() :: term(). +-type state() :: term(). +-type result() :: term(). +-type store_state() :: term(). + + +-spec create_if_missing(couch_rate:id(), state()) -> + store_state(). + +create_if_missing(Id, State) -> + Tid = term_to_table(Id), + case ets:lookup(Tid, Id) of + [_ | _] -> ok; + _ -> ets:insert(Tid, {Id, State}) + end, + ok. + + +-spec new(couch_rate:id(), state()) -> + store_state() + | {error, term()}. + +new(Id, State) -> + Tid = term_to_table(Id), + case ets:insert_new(Tid, {Id, State}) of + true -> ok; + false -> {error, #{reason => already_exists, id => Id}} + end. + + +-spec update(id(), store_state(), fun( + (id(), state()) -> {state(), result()} + )) -> + result() + | {error, term()}. + +update(Id, _StoreState, Fun) -> + Tid = term_to_table(Id), + case ets:lookup(Tid, Id) of + [{Id, State0}] -> + case Fun(Id, State0) of + {Result, State1} -> + ets:insert(Tid, {Id, State1}), + Result; + Error -> + Error + end; + _ -> + {error, #{reason => cannot_find, id => Id}} + end. + + +-spec lookup(id(), store_state()) -> + state() + | {error, term()}. + +lookup(Id, _StoreState) -> + Tid = term_to_table(Id), + case ets:lookup(Tid, Id) of + [{Id, State}] -> + State; + _ -> + {error, #{reason => cannot_find, id => Id}} + end. + + +create_tables() -> + Opts = [named_table, public, {read_concurrency, true}], + [ets:new(TableName, Opts) || TableName <- table_names()], + ok. + +delete_tables() -> + [ets:delete(TableName) || TableName <- table_names()], + ok. + + +-spec term_to_table(any()) -> atom(). +term_to_table(Term) -> + PHash = erlang:phash2(Term), + table_name(PHash rem ?SHARDS_N). + + +-dialyzer({no_return, table_names/0}). + +-spec table_names() -> [atom()]. +table_names() -> + [table_name(N) || N <- lists:seq(0, ?SHARDS_N - 1)]. + +-spec table_name(non_neg_integer()) -> atom(). +table_name(Id) when is_integer(Id), Id >= 0 andalso Id < ?SHARDS_N -> + list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id)).
\ No newline at end of file diff --git a/src/couch_rate/src/couch_rate_limiter.erl b/src/couch_rate/src/couch_rate_limiter.erl new file mode 100644 index 000000000..349da8d5a --- /dev/null +++ b/src/couch_rate/src/couch_rate_limiter.erl @@ -0,0 +1,387 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rate_limiter). + +%% This module implements an algorithm to control the consumption rate +%% parameters such as: +%% - batch size +%% - delay between batches +%% The components of the algorithm use +%% - [ascending minima algorithm](http://web.archive.org/web/20120805114719/http://home.tiac.net/~cri/2001/slidingmin.html) +%% - "Welford's method" of calculating average + +-export([ + new/2, + from_map/2, + budget/2, + delay/2, + wait/2, + in/3, + success/3, + failure/2, + is_congestion/2, + min_latency/2, + format/2, + to_map/2 +]). + +-type msec() :: non_neg_integer(). + +-define(STATE, ?MODULE). + +%% This is the number below which the math would not work due to round errors +%% In particular the default values for thresholds would be equal +-define(MIN_TARGET, 36). + +-define(record_to_keyval(Name, Record), + lists:zip(record_info(fields, Name), + tl(tuple_to_list(Record)))). + +-define(map_to_record(RecordName, Map), + element(1, lists:foldl(fun(Field, {Record, Idx}) -> + {setelement(Idx, Record, maps:get(Field, Map, element(Idx, Record))), Idx + 1} + end, {#RecordName{}, 2}, record_info(fields, RecordName)))). + + +-define(record_to_map(RecordName, Record), + element(1, lists:foldl(fun(Field, {Map, Idx}) -> + { + maps:put(Field, element(Idx, Record), Map), + Idx + 1 + } + end, {#{}, 2}, record_info(fields, RecordName)))). + +-record(?STATE, { + window_size = 0 :: 0 | pos_integer(), + timer = fun now_msec/0, + size = 1 :: pos_integer(), + epoch = 1 :: pos_integer(), + minimums :: queue:queue() | undefined, + start_ts = undefined, + mean_reads = 0.0, + mean_writes = 0.0, + reads = 0, + writes = 0, + target = 4500, + underload_threshold = 4275, %% target * 0.95 + overload_threshold = 4725, %% target * 1.05 + delay_threshold = 4950, %% target * 1.10 + multiplicative_factor = 0.7, + regular_delay = 100 :: timeout(), + congested_delay = 5000 :: timeout(), + initial_budget = 100, + latency = 0 +}). + +-type state() :: #?STATE{}. + + +-spec new(couch_rate:id(), Opts :: map()) -> state(). + +new(_Id, #{sensitivity := S}) when S =< 0 -> + error("expected SensitivityTimeWindow > 0"); + +new(_Id, #{target := T}) when T < ?MIN_TARGET -> + error("the target is too small"); + +new(_Id, #{budget := B, target := T, window := W, sensitivity := S} = Opts) -> + WinSize = W div S + 1, + validate_arguments(?map_to_record(?STATE, maps:merge(#{ + minimums => queue:new(), + window_size => WinSize, + initial_budget => B, + underload_threshold => round(T * 0.95), + overload_threshold => round(T * 1.05), + delay_threshold => round(T * 1.07) + }, maps:without([budget, window, sensitivity], Opts)))). + + +-spec from_map(couch_rate:id(), map()) -> state(). + +from_map(_Id, Map) -> + ?map_to_record(?STATE, Map). + + +-spec budget(couch_rate:id(), state()) -> + {pos_integer(), state()}. + +budget(Id, #?STATE{} = State) -> + #?STATE{ + reads = R, + writes = W, + mean_writes = MW, + mean_reads = MR, + multiplicative_factor = MultiplicativeFactor, + target = Target, + initial_budget = InitialBudget, + latency = Latency + } = State, + case pattern(Id, State) of + optimal -> + {max(1, round(MR)), State}; + failed -> + %% decrease budget + {max(1, round(R * MultiplicativeFactor)), State}; + overloaded -> + %% decrease budget + {max(1, round(R * MultiplicativeFactor)), State}; + underloaded -> + ReadWriteRatio = min(1, MR / max(1, MW)), + SingleWrite = Latency / W, + EstimatedWrites = floor(Target / SingleWrite), + {max(1, round(ReadWriteRatio * EstimatedWrites)), State}; + init -> + {InitialBudget, State} + end. + +-spec delay(couch_rate:id(), state()) -> + {pos_integer(), state()}. + +delay(Id, #?STATE{} = State) -> + #?STATE{ + regular_delay = RD, + congested_delay = CD + } = State, + case pattern(Id, State) of + failed -> + {CD, State}; + _ -> + {RD, State} + end. + + +-spec wait(couch_rate:id(), state()) -> + ok. + +wait(Id, State) -> + {Delay, _} = delay(Id, State), + timer:sleep(Delay). + + +-spec in(couch_rate:id(), state(), Reads :: pos_integer()) -> + {ok, state()}. + +in(_Id, #?STATE{timer = TimerFun} = State, Reads) -> + {ok, State#?STATE{ + reads = Reads, + start_ts = TimerFun() + }}. + + +-spec success(couch_rate:id(), state(), Writes :: pos_integer()) -> + {ok, state()}. + +success(_Id, #?STATE{start_ts = undefined} = State, _Writes) -> + {ok, State}; + +success(_Id, #?STATE{} = State, Writes) -> + #?STATE{ + start_ts = TS, + timer = TimerFun, + reads = Reads, + mean_reads = MeanReads, + mean_writes = MeanWrites, + window_size = WinSize + } = State, + {ok, update_min(State#?STATE{ + writes = Writes, + mean_writes = average(MeanWrites, WinSize, Writes), + mean_reads = average(MeanReads, WinSize, Reads), + latency = TimerFun() - TS + })}. + + +-spec failure(couch_rate:id(), state()) -> {ok, state()}. + +failure(_Id, #?STATE{start_ts = undefined} = State) -> + {ok, State}; + +failure(_Id, #?STATE{} = State) -> + #?STATE{ + timer = TimerFun, + start_ts = TS + } = State, + {ok, update_min(State#?STATE{ + writes = 0, + latency = TimerFun() - TS + })}. + + +-spec is_congestion(couch_rate:id(), state()) -> boolean(). + +is_congestion(Id, #?STATE{} = State) -> + case pattern(Id, State) of + overloaded -> true; + failed -> true; + _ -> false + end. + + +-spec format(couch_rate:id(), state()) -> [{Key :: atom(), Value :: term()}]. + +format(_Id, #?STATE{minimums = M} = State) -> + Map = ?record_to_map(?STATE, State), + Minimums = lists:map(fun({D, V}) -> + [{value, V}, {death, D}] + end, queue:to_list(M)), + maps:to_list(maps:merge(Map, #{ + minimums => Minimums + })). + + +-spec to_map(couch_rate:id(), state()) -> map(). + +to_map(_Id, #?STATE{} = State) -> + ?record_to_map(?STATE, State). + + +-spec update_min(state()) -> state(). + +update_min(#?STATE{latency = ProcessingDelay} = Q0) -> + Q1 = remove_greater_than(Q0, ProcessingDelay), + Q2 = append(Q1, ProcessingDelay), + maybe_remove_first(Q2). + + +-spec pattern(couch_rate:id(), state()) -> + init + | underloaded + | overloaded + | optimal + | failed. + +pattern(Id, #?STATE{} = State) -> + #?STATE{ + underload_threshold = UnderloadThreshold, + overload_threshold = OverloadThreshold, + writes = W, + mean_writes = MW + } = State, + case min_latency(Id, State) of + MinRollingLatency when MinRollingLatency > OverloadThreshold -> + overloaded; + MinRollingLatency when MinRollingLatency > UnderloadThreshold -> + optimal; + MinRollingLatency when MinRollingLatency > 0 andalso W == 0 -> + failed; + MinRollingLatency when MinRollingLatency == 0 andalso MW == 0.0 -> + init; + _ -> + underloaded + end. + + +-spec min_latency(couch_rate:id(), state()) -> pos_integer() | 0. + +min_latency(_Id, #?STATE{size = 1}) -> + 0; + +min_latency(_Id, #?STATE{minimums = Minimums}) -> + {value, {_, Min}} = head(Minimums), + Min. + + +validate_arguments(#?STATE{timer = TimerFun}) + when not is_function(TimerFun, 0) -> + error("expected `timer` to be an arity 0 function"); + +validate_arguments(#?STATE{window_size = WinSize}) + when WinSize < 1 -> + error("expected `window_size` to be greater than 1"); + +validate_arguments(#?STATE{initial_budget = Budget}) + when Budget < 1 -> + error("expected `initial_budget` to be greater than 1"); + +validate_arguments(#?STATE{overload_threshold = OT, target = T}) + when OT =< T -> + error("expected `overload_threshold` to be greater than `target`"); + +validate_arguments(#?STATE{underload_threshold = UT, target = T}) + when UT >= T -> + error("expected `underload_threshold` to be less than `target`"); + +validate_arguments(#?STATE{delay_threshold = DT, overload_threshold = OT}) + when DT =< OT -> + error("expected `delay_threshold` to be greater than `overload_threshold`"); + +validate_arguments(#?STATE{multiplicative_factor = MF}) + when MF < 0 orelse MF > 1 -> + error("expected `multiplicative_factor` to be in the (0, 1) range"); + +validate_arguments(#?STATE{} = State) -> + State. + + +-spec remove_greater_than(state(), pos_integer()) -> state(). + +remove_greater_than(#?STATE{minimums = Minimums, size = S} = State, Value) -> + case tail(Minimums) of + {value, {_, T}} when Value =< T -> + NewState = State#?STATE{minimums = tail_drop(Minimums), size = S - 1}, + remove_greater_than(NewState, Value); + {value, _} -> + State; + empty -> + State#?STATE{epoch = 1} + end. + + +-spec append(state(), pos_integer()) -> state(). + +append(#?STATE{minimums = Minimums, epoch = E, window_size = S} = State, Value) -> + Death = E + S, + State#?STATE{ + minimums = tail_put(Minimums, {Death, Value}), + epoch = E + 1, + size = S + 1 + }. + + +-spec maybe_remove_first(state()) -> state(). + +maybe_remove_first(#?STATE{minimums = Minimums, epoch = E, size = S} = State) -> + case head(Minimums) of + {value, {E, _V}} -> + State#?STATE{minimums = head_drop(Minimums), size = S - 1}; + _ -> + State + end. + + +% Donald Knuthâs Art of Computer Programming, Vol 2, page 232, 3rd +% Welford method +average(Avg, WindowSize, Value) -> + Delta = Value - Avg, + Avg + Delta / WindowSize. + +%% The helper functions are added because queue module +%% naming conventions are weird +head(Q) -> queue:peek_r(Q). + + +head_drop(Q) -> queue:drop_r(Q). + +tail(Q) -> queue:peek(Q). + + +tail_put(Q, V) -> queue:in_r(V, Q). + + +tail_drop(Q) -> queue:drop(Q). + + +-spec now_msec() -> msec(). +now_msec() -> + {Mega, Sec, Micro} = os:timestamp(), + ((Mega * 1000000) + Sec) * 1000 + Micro div 1000.
\ No newline at end of file diff --git a/src/couch_rate/src/couch_rate_pd.erl b/src/couch_rate/src/couch_rate_pd.erl new file mode 100644 index 000000000..5d79f7890 --- /dev/null +++ b/src/couch_rate/src/couch_rate_pd.erl @@ -0,0 +1,90 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rate_pd). + +-include("couch_rate.hrl"). + + +-export([ + new/2, + create_if_missing/2, + lookup/2, + update/3 +]). + +-type id() :: term(). +-type state() :: term(). +-type result() :: term(). +-type store_state() :: term(). + +-define(STATE_KEY, couch_rate_state). + + +-spec create_if_missing(couch_rate:id(), state()) -> store_state(). + +create_if_missing(Id, State) -> + case get({?STATE_KEY, Id}) of + undefined -> + put({?STATE_KEY, Id}, State), + ok; + _ -> + ok + end. + + +-spec new(couch_rate:id(), state()) -> + store_state() + | {error, term()}. + +new(Id, State) -> + case get({?STATE_KEY, Id}) of + undefined -> + put({?STATE_KEY, Id}, State), + ok; + _ -> + {error, #{reason => already_exists, id => Id}} + end. + + +-spec lookup(id(), store_state()) -> + state() + | {error, term()}. + +lookup(Id, _StoreState) -> + case get({?STATE_KEY, Id}) of + undefined -> + {error, #{reason => cannot_find, id => Id}}; + State -> + State + end. + + +-spec update(id(), store_state(), fun( + (id(), state()) -> {state(), result()} + )) -> + result() + | {error, term()}. + +update(Id, _StoreState, Fun) -> + case get({?STATE_KEY, Id}) of + undefined -> + {error, #{reason => cannot_find, id => Id}}; + State -> + case Fun(Id, State) of + {Result, State} -> + put({?STATE_KEY, Id}, State), + Result; + Error -> + Error + end + end. diff --git a/src/couch_rate/src/couch_rate_sup.erl b/src/couch_rate/src/couch_rate_sup.erl new file mode 100644 index 000000000..1ce01b644 --- /dev/null +++ b/src/couch_rate/src/couch_rate_sup.erl @@ -0,0 +1,36 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rate_sup). +-behaviour(supervisor). +-vsn(1). + +-export([ + start_link/0, + init/1 +]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +init([]) -> + couch_rate_ets:create_tables(), + Flags = #{ + strategy => one_for_one, + intensity => 5, + period => 10 + }, + Children = [ + ], + {ok, {Flags, Children}}.
\ No newline at end of file diff --git a/src/couch_rate/test/exunit/couch_rate_config_test.exs b/src/couch_rate/test/exunit/couch_rate_config_test.exs new file mode 100644 index 000000000..7db30d272 --- /dev/null +++ b/src/couch_rate/test/exunit/couch_rate_config_test.exs @@ -0,0 +1,88 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +defmodule Couch.Rate.Config.Test do + use ExUnit.Case, async: true + use ExUnitProperties + import StreamData + + @erlang_reserved_words MapSet.new([ + "after", + "and", + "andalso", + "band", + "begin", + "bnot", + "bor", + "bsl", + "bsr", + "bxor", + "case", + "catch", + "cond", + "div", + "end", + "fun", + "if", + "let", + "not", + "of", + "or", + "orelse", + "receive", + "rem", + "try", + "when", + "xor" + ]) + + alias :couch_rate_config, as: RLC + + test "parse valid configuration" do + parsed = RLC.from_str(~S(#{foo => 1, bar => 2.0})) + assert %{foo: 1, bar: 2} == parsed + end + + property "roundtrip" do + check all(options <- valid_config()) do + parsed = RLC.from_str(RLC.to_str(options)) + assert options == parsed + end + end + + defp valid_config() do + map_of( + erlang_atom(), + one_of([ + positive_integer(), + # we only support positive float + float(min: 0.0) + ]) + ) + end + + defp erlang_atom() do + bind(string(:alphanumeric), fn str -> + bind(integer(?a..?z), fn char -> + erlang_atom(str, char) + end) + end) + end + + defp erlang_atom(str, char) do + if MapSet.member?(@erlang_reserved_words, <<char, str::binary>>) do + String.to_atom(<<char, char, str::binary>>) + else + String.to_atom(<<char, str::binary>>) + end + end +end diff --git a/src/couch_rate/test/exunit/couch_rate_limiter_test.exs b/src/couch_rate/test/exunit/couch_rate_limiter_test.exs new file mode 100644 index 000000000..ff70f793a --- /dev/null +++ b/src/couch_rate/test/exunit/couch_rate_limiter_test.exs @@ -0,0 +1,350 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +defmodule Couch.Rate.Limiter.Test do + use ExUnit.Case, async: true + + @transaction_timeout 5_000 + + alias :couch_rate, as: RL + + describe "Stats :" do + @scenario %{rw_ratio: 1 / 1, target: 400, write_time: 100} + test "#{__ENV__.line} : #{inspect(@scenario)} (underloaded)" do + {rate_limiter, measurments} = simulate(@scenario, 1000) + stats = statistics(measurments) + maybe_debug(rate_limiter, measurments, stats) + + assert stats.wait_time.p90 == 100, + "expected no artificial delays for more than 90% of batches" + + budget = stats.budget + + assert floor(budget.p95) in 1..7, + "expected budget to converge into the 1..7 range (got #{budget.p95})" + + reads = stats.mean_reads + + assert floor(reads.p95) in 1..7, + "expected mean_read to converge into the 1..7 range (got #{reads.p95})" + + writes = stats.mean_writes + assert round(writes.p99) in 2..6 + "expected mean_writes to converge into the 2..6 range (got #{writes.p95})" + + assert stats.latency.p95 < @transaction_timeout, + "expected latency for 95% batches under @transaction_timout" + + found_after = initial_search_speed(measurments) + + assert found_after < 5, + "expected to find acceptable budget in less than 5 iterations (got: #{ + found_after + })" + + measurments + |> initial_search() + |> Enum.reduce(101, fn row, prev_budget -> + assert row.budget < prev_budget, + "expected to reduce budget while we fail" + + row.budget + end) + end + + @scenario %{rw_ratio: 1 / 8, target: 3900, write_time: 100} + test "#{__ENV__.line} : #{inspect(@scenario)} (optimal)" do + {rate_limiter, measurments} = simulate(@scenario, 1000) + stats = statistics(measurments) + maybe_debug(rate_limiter, measurments, stats) + + assert stats.wait_time.p90 == 100, + "expected no artificial delays for more than 90% of batches" + + budget = stats.budget + + assert floor(budget.p95) in 4..7, + "expected budget to converge into the 4..7 range (got #{budget.p95})" + + reads = stats.mean_reads + + assert floor(reads.p95) in 4..7, + "expected mean_read to converge into the 4..7 range (got #{reads.p95})" + + writes = stats.mean_writes + assert round(writes.p99) in 39..41 + "expected mean_writes to converge into the 39..41 range (got #{writes.p95})" + + assert stats.latency.p95 < @transaction_timeout, + "expected latency for 95% of batches under @transaction_timout" + + found_after = initial_search_speed(measurments) + + assert found_after < 10, + "expected to find acceptable budget in less than 10 iterations (got: #{ + found_after + })" + + measurments + |> initial_search() + |> Enum.reduce(101, fn row, prev_budget -> + assert row.budget < prev_budget, + "expected to reduce budget while we fail" + + row.budget + end) + end + + @scenario %{rw_ratio: 1 / 20, target: 3900, write_time: 100} + test "#{__ENV__.line} : #{inspect(@scenario)} (overloaded)" do + # This is a worst case scenario due to big variability of wait_time and + # big value read/write ratio + {rate_limiter, measurments} = simulate(@scenario, 1000) + stats = statistics(measurments) + maybe_debug(rate_limiter, measurments, stats) + + assert stats.wait_time.p90 == 100, + "expected no artificial delays for more than 90% of batches" + + budget = stats.budget + assert floor(budget.p95) in 1..4 + "expected budget to converge into the 1..4 range (got #{budget.p95})" + reads = stats.mean_reads + assert floor(reads.p95) in 1..4 + "expected mean_read to converge into the 1..4 range (got #{reads.p95})" + writes = stats.mean_writes + assert round(writes.p99) in 39..41 + "expected mean_writes to converge into the 39..41 range (got #{writes.p95})" + + assert stats.latency.p90 < @transaction_timeout, + "expected latency for 90% of batches under @transaction_timout" + + found_after = initial_search_speed(measurments) + + assert found_after < 16, + "expected to find acceptable budget in less than 16 iterations (got: #{ + found_after + })" + + measurments + |> initial_search() + |> Enum.reduce(101, fn row, prev_budget -> + assert row.budget < prev_budget, + "expected to reduce budget while we fail" + + row.budget + end) + end + end + + defp simulate(scenario, iterations) do + :couch_rate_ets.create_tables() + + limiter = + RL.new(:limiter_id, :couch_rate_limiter, nil, %{ + budget: 100, + target: scenario.target, + # average over 20 last measurments + window: scenario.write_time * 20, + sensitivity: scenario.write_time, + timer: &timer/0 + }) + + result = + Enum.reduce(0..iterations, {limiter, []}, fn _idx, {limiter, stats} -> + {budget, limiter} = step(limiter, scenario.rw_ratio, scenario.write_time) + {limiter, update_measurments(limiter, stats, budget)} + end) + + :couch_rate_ets.delete_tables() + result + end + + defp step(limiter, read_write_ratio, write_time) do + {reads, limiter} = RL.budget(limiter) + writes = round(reads / read_write_ratio) + {delay, limiter} = RL.delay(limiter) + sleep(delay) + data_before = RL.to_map(limiter) + {:ok, limiter} = RL.in(limiter, reads) + data_after = RL.to_map(limiter) + + assert data_after.size <= data_after.window_size + 1, + "The number of elements in minimums container shouldn't grow (got: #{ + data_after.size + })" + + if data_before.writes == 0 and + data_after.writes == 0 and + data_before.reads != 0 do + assert data_before.reads > data_after.reads, + "expected to reduce number of reads while transaction fails" + end + + total_write_time = + 0..writes + |> Enum.reduce_while(0, fn _, acc -> + write_time = :rand.normal(write_time, write_time * 0.25) + + if acc < @transaction_timeout do + {:cont, acc + write_time} + else + {:halt, acc} + end + end) + + sleep(total_write_time) + + if total_write_time < @transaction_timeout do + {:ok, limiter} = RL.success(limiter, writes) + {reads, limiter} + else + {:ok, limiter} = RL.failure(limiter) + {reads, limiter} + end + end + + defp update_measurments(limiter, stats, budget) do + data = RL.to_map(limiter) + {wait_time, _} = RL.delay(limiter) + + stats ++ + [ + %{ + budget: budget, + slack: data.target - data.latency, + rw_ratio: data.mean_reads / max(1, data.mean_writes), + latency: data.latency, + new_budget: budget, + minimum_latency: RL.min_latency(limiter), + wait_time: wait_time, + elements_in_min_queue: data.size, + mean_reads: data.mean_reads, + mean_writes: data.mean_writes, + total_reads: data.reads, + total_writes: data.writes + } + ] + end + + defp timer() do + now = Process.get(:time, 1) + Process.put(:time, now + 1) + now + end + + defp sleep(sleep_time_in_ms) do + now = timer() + Process.put(:time, now + sleep_time_in_ms - 1) + end + + defp format_table([first | _] = rows) do + spec = + first + |> Map.keys() + |> Enum.map(fn h -> {h, String.length(to_str(h))} end) + + header = first |> Map.keys() |> Enum.map(&to_str/1) |> Enum.join(" , ") + + lines = + Enum.map(rows, fn row -> + fields = + Enum.map(spec, fn {field, size} -> + String.pad_trailing("#{to_str(Map.get(row, field))}", size) + end) + + Enum.join(fields, " , ") + end) + + Enum.join([header | lines], "\n") + end + + defp initial_search_speed(measurments) do + length(initial_search(measurments)) + end + + defp initial_search(measurments) do + Enum.reduce_while(measurments, [], fn row, acc -> + if row.total_writes == 0 do + {:cont, acc ++ [row]} + else + {:halt, acc} + end + end) + end + + defp statistics(measurments) do + data = + Enum.reduce(measurments, %{}, fn row, acc -> + Enum.reduce(row, acc, fn {key, value}, acc -> + Map.update(acc, key, [], fn metric -> + metric ++ [value] + end) + end) + end) + + Enum.reduce(data, %{}, fn {key, values}, acc -> + stats = Enum.into(:bear.get_statistics(values), %{}) + {percentile, stats} = Map.pop(stats, :percentile) + + stats = + Enum.reduce(percentile, stats, fn {key, value}, acc -> + Map.put(acc, String.to_atom("p#{to_str(key)}"), value) + end) + + Map.put(acc, key, stats) + end) + end + + defp format_stats(stats) do + rows = + Enum.map(stats, fn {key, values} -> + values + |> Enum.into(%{}) + |> Map.put(:metric, key) + |> Map.delete(:histogram) + end) + + format_table(rows) + end + + defp to_str(int) when is_integer(int) do + "#{int}" + end + + defp to_str(float) when is_float(float) do + "#{Float.to_string(Float.round(float, 2))}" + end + + defp to_str(atom) when is_atom(atom) do + Atom.to_string(atom) + end + + defp to_str(string) when is_binary(string) do + string + end + + defp to_map(rate_limiter) do + RL.to_map(rate_limiter) + end + + defp maybe_debug(rate_limiter, measurments, stats) do + if System.fetch_env("EXUNIT_DEBUG") != :error do + IO.puts("") + IO.puts("rate_limiter: #{inspect(to_map(rate_limiter))}") + IO.puts("measurments: #{inspect(measurments)}") + IO.puts("stats: #{inspect(stats)}") + + IO.puts("\n" <> format_table(measurments) <> "\n" <> format_stats(stats)) + end + end +end diff --git a/src/couch_rate/test/exunit/test_helper.exs b/src/couch_rate/test/exunit/test_helper.exs new file mode 100644 index 000000000..9b9d6ef94 --- /dev/null +++ b/src/couch_rate/test/exunit/test_helper.exs @@ -0,0 +1,14 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter]) +ExUnit.start() diff --git a/src/couch_views/README.md b/src/couch_views/README.md index 49cd82b98..5647913f0 100644 --- a/src/couch_views/README.md +++ b/src/couch_views/README.md @@ -13,3 +13,36 @@ Code layout: * `couch_views_fdb` - Maps view operations to FoundationDB logic. * `couch_views_encoding` - Encodes view keys that are byte comparable following CouchDB view sort order. * `couch_views_server` - Spawns `couch_views_indexer` workers to handle index update jobs. + +# Configuration + +## Configuring rate limiter + +Here is the example of configuration used in `couch_view` application: + +``` +[couch_rate.views] +limiter = couch_rate_limiter +opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000} +``` + +Supported fields in `opts`: + +* `budget` - the initial value for estimated batch size +* `target` - the amount in msec which we try to maintain for batch processing time +* `window` - time interval for contention detector +* `sensitivity` - minimal interval within the `window` + +Unsupported fields in `opts` (if you really know what you are doing): + +* `window_size` - how many batches to consider in contention detector +* `timer` - this is used for testing to fast forward time `fun() -> current_time_in_ms() end` +* `target` - the amount in msec which we try to maintain for batch processing time +* `underload_threshold` - a threshold below which we would try to increase the budget +* `overload_threshold` - a threshold above which we would start decreasing the budget +* `delay_threshold` - a threshold above which we would start introducing delays between batches +* `multiplicative_factor` - determines how fast we are going to decrease budget (must be in (0..1) range) +* `regular_delay` - delay between batches when there is no overload +* `congested_delay` - delay between batches when there is an overload +* `initial_budget` - initial value for budget to start with + diff --git a/src/couch_views/src/couch_views.app.src b/src/couch_views/src/couch_views.app.src index 0d666affd..b704c9745 100644 --- a/src/couch_views/src/couch_views.app.src +++ b/src/couch_views/src/couch_views.app.src @@ -27,6 +27,7 @@ couch_stats, fabric, couch_jobs, - couch_eval + couch_eval, + couch_rate ]} ]}. diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index b41d0679b..0127bacec 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -72,6 +72,8 @@ init() -> fail_job(Job, Data, sig_changed, "Design document was modified") end, + Limiter = couch_rate:create_if_missing({DbName, DDocId}, "views"), + State = #{ tx_db => undefined, db_uuid => DbUUID, @@ -81,7 +83,7 @@ init() -> job => Job, job_data => Data, count => 0, - limit => num_changes(), + limiter => Limiter, doc_acc => [], design_opts => Mrst#mrst.design_opts }, @@ -94,6 +96,7 @@ init() -> error:database_does_not_exist -> fail_job(Job, Data, db_deleted, "Database was deleted"); Error:Reason -> + couch_rate:failure(Limiter), NewRetry = Retries + 1, RetryLimit = retry_limit(), @@ -152,7 +155,25 @@ add_error(Error, Reason, Data) -> update(#{} = Db, Mrst0, State0) -> - {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) -> + Limiter = maps:get(limiter, State0), + case couch_rate:budget(Limiter) of + 0 -> + couch_rate:wait(Limiter), + update(Db, Mrst0, State0); + Limit -> + {Mrst1, State1} = do_update(Db, Mrst0, State0#{limit => Limit, limiter => Limiter}), + case State1 of + finished -> + couch_eval:release_map_context(Mrst1#mrst.qserver); + _ -> + couch_rate:wait(Limiter), + update(Db, Mrst1, State1) + end + end. + + +do_update(Db, Mrst0, State0) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> % In the first iteration of update we need % to populate our db and view sequences State1 = case State0 of @@ -174,14 +195,18 @@ update(#{} = Db, Mrst0, State0) -> #{ count := Count, - limit := Limit, doc_acc := DocAcc, - last_seq := LastSeq + last_seq := LastSeq, + limit := Limit, + limiter := Limiter } = State2, - DocAcc1 = fetch_docs(TxDb, DocAcc), + couch_rate:in(Limiter, Count), + {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1), - write_docs(TxDb, Mrst1, MappedDocs, State2), + WrittenDocs = write_docs(TxDb, Mrst1, MappedDocs, State2), + + couch_rate:success(Limiter, WrittenDocs), case Count < Limit of true -> @@ -196,14 +221,7 @@ update(#{} = Db, Mrst0, State0) -> view_seq := LastSeq }} end - end), - - case State4 of - finished -> - couch_eval:release_map_context(Mrst2#mrst.qserver); - _ -> - update(Db, Mrst2, State4) - end. + end). fold_changes(State) -> @@ -304,12 +322,14 @@ write_docs(TxDb, Mrst, Docs, State) -> KeyLimit = key_size_limit(), ValLimit = value_size_limit(), - lists:foreach(fun(Doc0) -> + DocsNumber = lists:foldl(fun(Doc0, N) -> Doc1 = calculate_kv_sizes(Mrst, Doc0, KeyLimit, ValLimit), - couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1) - end, Docs), + couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1), + N + 1 + end, 0, Docs), - couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq). + couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq), + DocsNumber. fetch_docs(Db, Changes) -> @@ -472,10 +492,6 @@ fail_job(Job, Data, Error, Reason) -> exit(normal). -num_changes() -> - config:get_integer("couch_views", "change_limit", 100). - - retry_limit() -> config:get_integer("couch_views", "retry_limit", 3). diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl index 8f8f3c5cb..43b58284d 100644 --- a/src/couch_views/test/couch_views_indexer_test.erl +++ b/src/couch_views/test/couch_views_indexer_test.erl @@ -50,7 +50,8 @@ indexer_test_() -> ?TDEF_FE(handle_size_value_limits), ?TDEF_FE(index_autoupdater_callback), ?TDEF_FE(handle_db_recreated_when_running), - ?TDEF_FE(handle_db_recreated_after_finished) + ?TDEF_FE(handle_db_recreated_after_finished), + ?TDEF_FE(index_budget_is_changing) ] } } @@ -375,6 +376,55 @@ index_autoupdater_callback(Db) -> ?assertEqual(ok, couch_views_jobs:wait_for_job(JobId, DbSeq)). +index_budget_is_changing(Db) -> + ok = meck:new(couch_rate, [passthrough]), + ok = meck:expect(couch_rate, budget, fun(State) -> + meck:passthrough([State]) + end), + + LimiterOpts = #{ + budget => 100, + sensitivity => 500, + target => 500, + timer => fun timer/0, + window => 2000 + }, + + ok = meck:expect(couch_rate, create_if_missing, fun(Id, Module, Store, _Options) -> + meck:passthrough([Id, Module, Store, LimiterOpts]) + end), + + ok = meck:expect(couch_rate, wait, fun(State) -> + Delay = couch_rate:delay(State), + put(time, timer() + Delay - 1) + end), + + DDoc = create_ddoc(), + Docs = lists:map(fun doc/1, lists:seq(1, 200)), + + {ok, _} = fabric2_db:update_docs(Db, [DDoc | Docs], []), + + {ok, _Out} = couch_views:query( + Db, + DDoc, + <<"map_fun2">>, + fun fold_fun/2, + [], + #mrargs{} + ), + ?assert(length(lists:usort(budget_history())) > 1). + + +timer() -> + get(time) == undefined andalso put(time, 1), + Now = get(time), + put(time, Now + 1), + Now. + + +budget_history() -> + [Result || {_Pid, {couch_rate, budget, _}, Result} <- meck:history(couch_rate)]. + handle_db_recreated_when_running(Db) -> DbName = fabric2_db:name(Db), @@ -386,7 +436,8 @@ handle_db_recreated_when_running(Db) -> % To intercept job building while it is running ensure updates happen one % row at a time. - config:set("couch_views", "change_limit", "1", false), + ok = meck:new(couch_rate, [passthrough]), + ok = meck:expect(couch_rate, budget, ['_'], meck:val(1)), meck_intercept_job_update(self()), diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl index c4f76d897..f8a5ce535 100644 --- a/src/couch_views/test/couch_views_trace_index_test.erl +++ b/src/couch_views/test/couch_views_trace_index_test.erl @@ -51,7 +51,7 @@ indexer_test_() -> setup() -> - test_util:start_couch([fabric, couch_js]). + test_util:start_couch([fabric, couch_js, couch_rate]). cleanup(Ctx) -> |